直接使用
请打开IsolationForest算法解决异常检测问题,并点击右上角 “ 在DSW中打开” 。
Alink: 如何使用 IsolationForest 异常检测算法做流式检测
在数据挖掘中,异常检测(英语:anomaly detection)对不匹配预期模式或数据集中其他项目的项目、事件或观
测值的识别。通常异常项目会转变成银行欺诈、网络攻击、结构缺陷、身体疾病、文本错误等类型的问题。异常也
被称为离群值、新奇、噪声、偏差和例外。
在实际业务中的异常检测往往希望检测是实时的,也就是今天我们要介绍的流式异常检测。Alink[1] 中提供了多种
异常检测算法,例如OneClassSvm、IsolationForest、LOF、SOS、BoxPlot、Ecod、Dbscan、Kde、KSigma等
算法,下面将重点介绍如何在DSW中使用Isolationforest算法搭建一个流式异常检测业务,并且对这些异常检测算
法的检测结果进行评估。
[1] https://github.com/alibaba/Alink
运行环境要求
- PAI-DSW 官方镜像中默认已经安装了 PyAlink,内存要求 4G 及以上。
- 本 Notebook 的内容可以直接运行查看,不需要准备任何其他文件。
# 创建本地的pyalink环境,并设置并行度为2 from pyalink.alink import * useLocalEnv(4)
异常检测数据
我们这里选用数据集合 Http (KDDCUP99)[1] 来作为我们异常检测的输入数据,搭建我们的异常检测业务流程。该数据
集合的介绍及获取参见下面链接内容。
其中Http 数据集合的异常点label集合是:"satan.", "portsweep.", "phf.", "ipsweep.", "back."。
[1] http://odds.cs.stonybrook.edu/http-kddcup99-dataset/
# 读入待检测的数据,这里我们使用的是csv数据 # 目前 pyalink 支持多种数据的导入: # 1. 批数据源:csv文件,ak文件,各种数据库等 # 2. 流数据源:datahub,kafka,SLS,csv文件构造的有限流等 kdd99Http = CsvSourceBatchOp() \ .setFilePath("./kdd99_html.csv") \ .setSchemaStr("duration double, src_bytes double, dst_bytes double, label string") kdd99Http.lazyPrint(5) BatchOperator.execute()
duration | src_bytes | dst_bytes | label | |
0 | 0.0 | 212.0 | 1131.0 | normal. |
1 | 0.0 | 205.0 | 1915.0 | normal. |
2 | 0.0 | 203.0 | 1681.0 | normal. |
3 | 0.0 | 206.0 | 1202.0 | normal. |
4 | 0.0 | 205.0 | 324.0 | normal. |
IsolationForest 模型
IsolationForest 算法是一个无监督的异常检测算法,我们首先使用该算法的训练组件得到异常检测模型。
我们使用 IsolationForest 算法来检测 Http (KDDCUP99) 数据。具体包括如下内容:
- 训练 IsolationForest 模型
- 使用模型预测并评估
- 估结果比较好的模型部署成一个流服务
- 对实时数据的检测结果进行评估
- 将训练和预测同时在一个组件中完成
- 基于 Window 的实时异常检测
训练 IsolationForest 模型
FEATURE_COLS = ["duration", "src_bytes", "dst_bytes"] LABEL_COL = "label" PREDICTION_COL = "pred" PREDICTION_DETAIL_COL = "pred_info" OUTLIER_VALUES = ["satan.", "portsweep.", "phf.", "ipsweep.", "back."] # 异常检测模型训练 iforestModel = IForestModelOutlierTrainBatchOp() \ .setNumTrees(100) \ .setFeatureCols(FEATURE_COLS).linkFrom(kdd99Http) iforestModel.link(AkSinkBatchOp().setFilePath("./iforest_model.ak").setOverwriteSink(True)) BatchOperator.execute()
使用模型预测并评估
FEATURE_COLS = ["duration", "src_bytes", "dst_bytes"] LABEL_COL = "label" PREDICTION_COL = "pred" PREDICTION_DETAIL_COL = "pred_info" OUTLIER_VALUES = ["satan.", "portsweep.", "phf.", "ipsweep.", "back."] # 读入已经训练好的模型 # iforest = AkSourceBatchOp().setFilePath("./iforest_model.ak") iforest = IForestModelOutlierTrainBatchOp() \ .setNumTrees(100) \ .setFeatureCols(FEATURE_COLS).linkFrom(kdd99Http) # 使用模型对数据检测 results = IForestModelOutlierPredictBatchOp() \ .setPredictionCol(PREDICTION_COL) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL) \ .setOutlierThreshold(0.52) \ .linkFrom(iforest, kdd99Http) # 评估模型 results.link(EvalOutlierBatchOp()\ .setLabelCol(LABEL_COL)\ .setPredictionDetailCol(PREDICTION_DETAIL_COL)\ .setOutlierValueStrings(OUTLIER_VALUES)\ .lazyPrintMetrics("IForest Kdd99_html")) BatchOperator.execute()
IForest Kdd99_html -------------------------------- Metrics: -------------------------------- Outlier values: [satan., portsweep., phf., ipsweep., back.] Normal values: [normal.] Auc:0.9998 Accuracy:0.9462 Precision:0.0675 Recall:1 F1:0.1265 |Pred\Real|Outlier|Normal| |---------|-------|------| | Outlier| 2211| 30545| | Normal| 0|534742|
评估结果比较好的模型部署成一个流服务
这里我们构造了一个假的实时数据流来辅助搭建这个异常检测流服务,我们使用上面训练好的模型对这个数据流进行实时
异常检测。
# 创建一个流数据源,实际业务中可以使用Kafka,SLS等流数据源读入实时数据 streamKdd99Http = CsvSourceStreamOp() \ .setFilePath("./kdd99_html.csv") \ .setSchemaStr("duration double, src_bytes double, dst_bytes double, label string") #.link(SpeedControlStreamOp().setTimeInterval(0.1)) # 读入已经训练好的模型 iforest = AkSourceBatchOp().setFilePath("./iforest_model.ak") # 使用模型搭建一个流预测服务 results = IForestModelOutlierPredictStreamOp(iforest) \ .setPredictionCol(PREDICTION_COL) \ .setOutlierThreshold(0.666) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL) \ .linkFrom(streamKdd99Http) # 打印预测结果数据 results.sample(0.001).print() StreamOperator.execute()
'DataStream 773fe8a1e1be482980a165714e12441d : ( Updated on 2022-10-19 12:10:53, #items received: 599 )'
duration |
src_bytes | dst_bytes | label | pred | pred_info | |
1 | 0.0 | 236.0 | 1112.0 | normal. | False | {"outlier_score":"0.43928655607709555","is_out... |
2 | 0.0 | 338.0 | 935.0 | normal. | False | {"outlier_score":"0.4493311348646613","is_outl... |
3 | 0.0 | 337.0 | 406.0 | normal. | False | {"outlier_score":"0.45059479920465106","is_out... |
4 | 0.0 | 257.0 | 1758.0 | normal. | False | {"outlier_score":"0.4437314516880966","is_outl... |
5 | 0.0 | 228.0 | 2390.0 | normal. | False | {"outlier_score":"0.4397999592091132","is_outl... |
... | ... | ... | ... | ... | ... | ... |
96 | 0.0 | 342.0 | 1266.0 | normal. | False | {"outlier_score":"0.4517005960836067","is_outl... |
97 | 0.0 | 241.0 | 911.0 | normal. | False | {"outlier_score":"0.4414007159085076","is_outl... |
98 | 0.0 | 210.0 | 1691.0 | normal. | False | {"outlier_score":"0.43942882876089767","is_out... |
99 | 455.0 | 332.0 | 284.0 | normal. | False | {"outlier_score":"0.4493346755297189","is_outl... |
100 | 0.0 | 244.0 | 4498.0 | normal. | False | {"outlier_score":"0.4431503110493093","is_outl... |
100 rows × 6 columns
对实时数据的检测结果进行评估
我们对实时预测结果使用流评估组件,实时评估某个时间间隔内样本的检测效果,并打印"Accuracy", "AUC",
"ConfusionMatrix", "F1"等指标。
# 对流预测结果进行实时评估,可以设置时间间隔,来控制评估结果输出频次 results.link(EvalOutlierStreamOp()\ .setLabelCol(LABEL_COL)\ .setTimeInterval(3) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL)\ .setOutlierValueStrings(OUTLIER_VALUES)) \ .link(JsonValueStreamOp().setSelectedCol("Data") .setReservedCols(["Statistics"]) .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix", "F1"]) .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix", "$.F1"])).print() StreamOperator.execute()
'DataStream d18474f9b48243348c3bbdcc9d08831e : ( Updated on 2022-10-19 12:11:17, #items received: 12 )'
将训练和预测同时在一个组件中完成(不保存模型的批式任务)#
该过程中,我们可以设定一个训练模型的样本数目,算法会buffer住这些数据并训练异常检测模型并同时用来
预测这些样本。
# 对数据分批处理,每20000条数据做一次训练和预测,并输出预测结果 results = IForestOutlierBatchOp() \ .setMaxSampleNumPerGroup(20000) \ .setFeatureCols(FEATURE_COLS)\ .setMaxOutlierRatio(0.0039) \ .setPredictionCol(PREDICTION_COL) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL) \ .linkFrom(kdd99Http) # 对预测结果进行评估,并输出评估指标 results.link(EvalOutlierBatchOp()\ .setLabelCol(LABEL_COL)\ .setPredictionDetailCol(PREDICTION_DETAIL_COL)\ .setOutlierValueStrings(OUTLIER_VALUES)\ .lazyPrintMetrics("OCSVM forest_cover"))\ .link(JsonValueBatchOp().setSelectedCol("Data")\ .setReservedCols(["Statistics"])\ .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix", "F1"])\ .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix", "$.F1"])).print()
OCSVM forest_cover -------------------------------- Metrics: -------------------------------- Outlier values: [satan., portsweep., phf., ipsweep., back.] Normal values: [normal.] Auc:0.9995 Accuracy:0.9993 Precision:0.9061 Recall:0.9073 F1:0.9067 |Pred\Real|Outlier|Normal| |---------|-------|------| | Outlier| 2006| 208| | Normal| 205|565079|
基于 Window 的实时异常检测
实时异常检测是使用当前时刻之前流过N个的样本作为训练集合训练模型对当前模型检测的算法。该算法组件每接收一条样
本时会将该条样本之前的N条样本收集起来训练一个模型,并用这个模型预测该条样本。这种方式计算量巨大,每一条样本
都会训练一个模型,所以不建议在QPS比较高的业务场景中使用。
# 读入流数据,这里可以将数据源改为Kafka,Datahub,SLS等实时数据源 streamKdd99Http = CsvSourceStreamOp() \ .setFilePath("./kdd99_html.csv") \ .setSchemaStr("duration double, src_bytes double, dst_bytes double, label string") \ .sample(0.02).link(SpeedControlStreamOp().setTimeInterval(0.002)) # 实时IForest异常检测 results = IForestOutlierStreamOp() \ .setPrecedingRows(1000) \ .setFeatureCols(FEATURE_COLS)\ .setPredictionCol(PREDICTION_COL) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL) \ .linkFrom(streamKdd99Http) # 结果采样打印 results.sample(0.002).print()
'DataStream 1efe02109051417d83a0f0d24e824269 : ( Updated on 2022-10-19 12:16:03, #items received: 25 )'
<pyalink.alink.stream.common.stream_op_7.SampleStreamOp at 0x7fd8ce5c45d0>
# 实时评估算法预测的结果 results.link(EvalOutlierStreamOp()\ .setLabelCol(LABEL_COL)\ .setTimeInterval(10) \ .setPredictionDetailCol(PREDICTION_DETAIL_COL)\ .setOutlierValueStrings(OUTLIER_VALUES))\ .link(JsonValueStreamOp().setSelectedCol("Data")\ .setReservedCols(["Statistics"])\ .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix", "F1"])\ .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix", "$.F1"])).print() StreamOperator.execute()
'DataStream b4c6a9e532e74d698f214641844e87c8 : ( Updated on 2022-10-19 12:16:04, #items received: 8 )'
总结
本文重点介绍了经典无监督异常检测算法 IsolationForest 以及如何基于 Alink 在 DSW 上快速完成异常检测业务流程的
搭建。并且针对不同的业务场景,给出了不同的代码案例,用户可以根据自己业务场景的需求,选择合适的算法调用方式
最终达到快速搭建业务流程的目的。
- 如果用户的数据已经落盘,建议使用批的方式对数据进行检测,具体参见上面的代码案例。
- 如果用户的数据是实时数据(Kafka,SLS,DataHub等),则建议用户通过流服务的方式使用这个算法。