直接使用
请打开OneClassSVM 算法解决异常检测问题,并点击右上角 “ 在DSW中打开” 。
Alink: 如何使用 OneClassSvm 异常检测算法做流式检测
在数据挖掘中,异常检测(英语:anomaly detection)对不匹配预期模式或数据集中其他项目的项目、事件或观测值
的识别。通常异常项目会转变成银行欺诈、网络攻击、结构缺陷、身体疾病、文本错误等类型的问题。异常也被称为
离群值、新奇、噪声、偏差和例外。
在实际业务中的异常检测往往希望检测是实时的,也就是今天我们要介绍的流式异常检测。Alink[1] 中提供了多种异
常检测算法,例如OneClassSvm、IsolationForest、LOF、SOS、BoxPlot、Ecod、Dbscan、Kde、KSigma等算法
,下面将重点介绍如何在DSW中使用OneClassSvm算法搭建一个流式异常检测业务,并且对这些异常检测算法的检
测结果进行评估。
[1] https://github.com/alibaba/Alink
运行环境要求
- PAI-DSW 官方镜像中默认已经安装了 PyAlink,内存要求 4G 及以上。
- 本 Notebook 的内容可以直接运行查看,不需要准备任何其他文件。
# 创建本地的pyalink环境,并设置并行度为2 from pyalink.alink import * useLocalEnv(2)
异常检测数据
我们这里选用两个数据集合 ForestCover[1] 来作为我们异常检测的输入数据,搭建我们的异常检测业务流程。该数据
集合的介绍及获取参见下面链接内容。
其中 ForestCover 数据集合的异常点label集合是:4。
[1] http://odds.cs.stonybrook.edu/forestcovercovertype-dataset/
forestCover = CsvSourceBatchOp() \ .setFilePath("./forest_cover.csv") \ .setSchemaStr("elevation double, aspect double, slope double, hd_to_hydrology double, vd_to_hydrology double," +" hd_to_roadways double, hillshade_9am double, hillshade_noon double, hillshade_3pm double," +" hd_to_fps double, label int") forestCover.lazyPrint(5) BatchOperator.execute()
OneClassSvm 模型
OneClassSvm 算法是一个无监督的异常检测算法,我们这里使用线性 kernel 对数据进行训练,得到异常检测模型。
我们使用 OneClassSvm 算法来检测 ForestCover 数据。具体包括如下内容:
- 训练模型,并对模型进行评估
- 评估结果比较好的模型部署成一个流服务
- 对实时数据的检测结果进行评估
- 将训练和预测同时在一个组件中完成
- 基于 Window 的实时异常检测
训练 OneClassSvm 模型,并评估
经过调参后,我们发现当只使用一个特征(elevation)时,得到的效果是最好的。这个说明在异常检测过程中,
并不是特征越多越好,有的特征是起反向作用的,只有选择合适的特征才会闹大最好的效果。
FEATURE_COLS = ["elevation"] LABEL_COL = "label" PREDICTION_COL = "pred" PREDICTION_DETAIL_COL = "pred_info" OUTLIER_VALUES = ["4"] # 异常检测模型训练 ocsvmModel = OcsvmModelOutlierTrainBatchOp() \ .setNu(0.01) \ .setKernelType("LINEAR") \ .setGamma(0.1) \ .setFeatureCols(FEATURE_COLS).linkFrom(forestCover) # 使用模型对数据检测 results = OcsvmModelOutlierPredictBatchOp() \ .setPredictionCol(PREDICTION_COL) \ .setOutlierThreshold(1.5) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL) \ .linkFrom(ocsvmModel, forestCover) # 评估模型 results.link(EvalOutlierBatchOp()\ .setLabelCol(LABEL_COL)\ .setPredictionDetailCol(PREDICTION_DETAIL_COL)\ .setOutlierValueStrings(OUTLIER_VALUES)\ .lazyPrintMetrics("OCSVM forest_cover")) ocsvmModel.link(AkSinkBatchOp().setFilePath("./ocsvm_model.ak").setOverwriteSink(True)) BatchOperator.execute()
OCSVM forest_cover -------------------------------- Metrics: -------------------------------- Outlier values: [4] Normal values: [2] Auc:0.9994 Accuracy:0.9974 Precision:0.8523 Recall:0.8846 F1:0.8682 |Pred\Real|Outlier|Normal| |---------|-------|------| | Outlier| 2430| 421| | Normal| 317|282880|
评估结果比较好的模型部署成一个流服务
这里我们构造了一个假的实时数据流来辅助搭建这个异常检测流服务,我们使用上面训练好的模型对这个数据
流进行实时异常检测。
# 创建一个流数据源,实际业务中可以使用Kafka,SLS等流数据源读入实时数据 streamForestCover = CsvSourceStreamOp() \ .setFilePath("./forest_cover.csv") \ .setSchemaStr("elevation double, aspect double, slope double, hd_to_hydrology double, vd_to_hydrology double," +" hd_to_roadways double, hillshade_9am double, hillshade_noon double, hillshade_3pm double," +" hd_to_fps double, label int").link(SpeedControlStreamOp().setTimeInterval(0.0001)) # 读入已经训练好的模型 ocsvmModel = AkSourceBatchOp().setFilePath("./ocsvm_model.ak") # 使用模型搭建一个流预测服务 results = OcsvmModelOutlierPredictStreamOp(ocsvmModel) \ .setPredictionCol(PREDICTION_COL) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL) \ .linkFrom(streamForestCover) # 打印预测结果数据 results.sample(0.00002).print()
'DataStream 9ecd8b04536d4437991ec0cb133b7710 : ( Updated on 2022-10-19 12:03:45, #items received: 6 )'
对实时数据的检测结果进行评估
我们对实时预测结果使用流评估组件,实时评估某个时间间隔内样本的检测效果,并打印"Accuracy", "AUC",
"ConfusionMatrix", "F1"等指标。
# 对流预测结果进行实时评估,可以设置时间间隔,来控制评估结果输出频次 results.link(EvalOutlierStreamOp()\ .setLabelCol(LABEL_COL)\ .setTimeInterval(10) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL)\ .setOutlierValueStrings(OUTLIER_VALUES)) \ .link(JsonValueStreamOp().setSelectedCol("Data") .setReservedCols(["Statistics"]) .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix", "F1"]) .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix", "$.F1"])).print() StreamOperator.execute()
'DataStream a5dd6d52e2b249858afdfad5d1a08409 : ( Updated on 2022-10-19 12:03:51, #items received: 10 )'
将训练和预测同时在一个组件中完成(不保存模型的批式任务)
该过程中,我们可以设定一个训练模型的样本数目,算法会buffer住这些数据并训练异常检测模型并同时用来
预测这些样本。
# 对数据分批处理,每20000条数据做一次训练和预测,并输出预测结果 results = OcsvmOutlierBatchOp() \ .setNu(0.01) \ .setKernelType("LINEAR") \ .setGamma(0.1) \ .setMaxSampleNumPerGroup(20000) \ .setFeatureCols(FEATURE_COLS)\ .setPredictionCol(PREDICTION_COL) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL) \ .linkFrom(forestCover) # 对预测结果进行评估,并输出评估指标 results.link(EvalOutlierBatchOp()\ .setLabelCol(LABEL_COL)\ .setPredictionDetailCol(PREDICTION_DETAIL_COL)\ .setOutlierValueStrings(OUTLIER_VALUES)\ .lazyPrintMetrics("OCSVM forest_cover"))\ .link(JsonValueBatchOp().setSelectedCol("Data")\ .setReservedCols(["Statistics"])\ .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix", "F1"])\ .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix", "$.F1"])).print()
OCSVM forest_cover -------------------------------- Metrics: -------------------------------- Outlier values: [4] Normal values: [2] Auc:0.9993 Accuracy:0.9974 Precision:0.8491 Recall:0.8831 F1:0.8658 |Pred\Real|Outlier|Normal| |---------|-------|------| | Outlier| 2426| 431| | Normal| 321|282870|
基于 Window 的实时异常检测
实时异常检测是使用当前时刻之前流过N个的样本作为训练集合训练模型对当前模型检测的算法。该算法组件每接收一
条样本时会将该条样本之前的N条样本收集起来训练一个模型,并用这个模型预测该条样本。这种方式计算量巨大,每
一条样本都会训练一个模型,所以不建议在QPS比较高的业务场景中使用。
# 读入流数据,这里可以将数据源改为Kafka,Datahub,SLS等实时数据源 streamForestCover = CsvSourceStreamOp() \ .setFilePath("./forest_cover.csv") \ .setSchemaStr("elevation double, aspect double, slope double, hd_to_hydrology double, vd_to_hydrology double," +" hd_to_roadways double, hillshade_9am double, hillshade_noon double, hillshade_3pm double," +" hd_to_fps double, label int").sample(0.02).link(SpeedControlStreamOp().setTimeInterval(0.002)) # 实时Ocsvm异常检测 results = OcsvmOutlierStreamOp()\ .setNu(0.000005)\ .setKernelType("LINEAR")\ .setEpsilon(0.0001)\ .setGamma(0.1)\ .setPrecedingRows(1000)\ .setFeatureCols(FEATURE_COLS)\ .setPredictionCol(PREDICTION_COL)\ .setPredictionDetailCol(PREDICTION_DETAIL_COL)\ .linkFrom(streamForestCover) # 结果采样打印 results.select("label, pred, pred_info").sample(0.002).print()
'DataStream 4039631f01704291948e2b8b6d80075c : ( Updated on 2022-10-19 12:08:27, #items received: 11 )'
<pyalink.alink.stream.common.stream_op_7.SampleStreamOp at 0x7f96e5fe4cd0>
# 实时评估算法预测的结果 results.link(EvalOutlierStreamOp()\ .setLabelCol(LABEL_COL)\ .setTimeInterval(10) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL)\ .setOutlierValueStrings(OUTLIER_VALUES))\ .link(JsonValueStreamOp().setSelectedCol("Data")\ .setReservedCols(["Statistics"])\ .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix", "F1"])\ .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix", "$.F1"])).print() StreamOperator.execute()
'DataStream 996a42bd24e648589f5d5e931d91111b : ( Updated on 2022-10-19 12:09:10, #items received: 4 )'
总结
本文重点介绍了经典无监督异常检测算法 OneClassSVM 以及如何基于 Alink 在 DSW 上快速完成异常检测业务流程的搭
建。并且针对不同的业务场景,给出了不同的代码案例,用户可以根据自己业务场景的需求选择合适的算法调用方式最终
达到快速搭建业务流程的目的。
- 如果用户的数据已经落盘,建议使用批的方式对数据进行检测,具体参见上面的代码案例。
- 如果用户的数据是实时数据(Kafka,SLS,DataHub等),则建议用户通过流服务的方式使用这个算法。