最好大家还是看官方文档,我只是当一个搬运工
一、背景
在一些业务场景中,一个流中可能有多种类型的数据,比如订单:有线上订单,有线下订单。当需要将不同类型的数据进行分别处理,比如 写入到不同的数据表或者join 不同的其他流时,这个时候使用分流就比较合适。
二、官方常用的几种方法
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
三、示范
本文只详细介绍最常用 process 分流 ,道理都是相通的
直接上代码伪码,大家主要要理解,而不是直接复制代码
//这是订单source,最原始的流
val orderSource = 这是你构建source 的方法
//创建线上订单 tag
val onlineOrderTag = new OutputTag[JSONObject]("onlineOrder")
//创建线下订单 tag
val offlineOrderTag = new OutputTag[JSONObject]("offlineOrder")
// 这个sideOutStream 就是分流之后的流对象
val sideOutStream = orderSource
.filter(new PaymentFilter) // 这里是一个过滤逻辑,如果你没有可以不过滤
// 这个process 就是分流的操作了
.process(new ProcessFunction[String, JSONObject] {
override def processElement(orderString: String, ctx: ProcessFunction[String, JSONObject]#Context, out: Collector[JSONObject]): Unit = {
val outOrder = JSON.parseObject(orderString)
//通过收银员信息判断是否属于线下订单
if (!outOrder.containsKey("cashier_id") || StringUtils.isBlank(outOrder.getString("cashier_id"))) {
ctx.output(onlineOrderTag, outOrder)
} else {
ctx.output(offlineOrderTag, outOrder)
}
}
}
)
val onlineStream = sideOutStream.getSideOutput(onlineOrderTag)
val offlineStream = sideOutStream.getSideOutput(offlineOrderTag)
// 流已经分好了,后面是sink 还是 去干其他的,就看你的业务逻辑了
onlineStream.addSink()
offlineStream.addSink()
生产实践
下图是真实生产的一个DAG图
内部使用了分流, join ,自定义剔除器 等满足业务需求
后面会更新 join 和 自定义剔除器 trigger 等 实战场景,感兴趣的朋友可以加个关注哟