[工作流]一个任务的一辈子

简介: 参考:[https://yuque.antfin-inc.com/hanjun.hw/yvmten2/ido5af](https://yuque.antfin-inc.com/hanjun.hw/yvmten2/ido5af)引擎相关技术文档| **流程引擎到平台的mq消息链路** | [https://yuque.antfin-inc.com/hanjun.hw/yvmten2/id

参考:
https://yuque.antfin-inc.com/hanjun.hw/yvmten2/ido5af

引擎相关技术文档

流程引擎到平台的mq消息链路 https://yuque.antfin-inc.com/hanjun.hw/yvmten2/ido5af
引擎核心介绍 https://yuque.antfin-inc.com/alibpms/ssd5fr/bnn3qt
加签的基础逻辑 https://yuque.antfin-inc.com/alibpms/alipmc/append
执行任务结束如何触发下一个任务序列 https://yuque.antfin-inc.com/hanjun.hw/yvmten2/pi25gh
如何解析取人规则 https://yuque.antfin-inc.com/hanjun.hw/yvmten2/pi25gh
什么时候激发下一个taskGroup https://yuque.antfin-inc.com/hanjun.hw/hqdpat5/evis8a

总览

image.png

  1. 孕育:这一步是生命的起源,对应"任务"就是:申办人因为办理业务而发起一个流程。这是任务产生的摇篮。

任务的使命就是为了完成业务;

  1. 生产:这是新生命产生的过程,对应"任务"就是:任务的创建和激活。
  2. 长大:婴儿长大了要办满月酒,办周岁,让亲人朋友分享喜悦。对应"任务"就是:通知审批人。
  3. 进入社会:小孩长大了,迟早要进入社会,面对不同人的看法和检阅。对应"任务"就是:给审批人审阅,确认填写资料是否正确,是否合规。
  4. 经受考验:人进入社会之后,会经受各种考验,如果能够成功顶住,就能获取飞跃般的成长。对应"任务"就是:合格合规,得到审批人的认可,不用被打回去重填。
  5. 过关斩将:人在考验中不断成长,日积月累,最终会进入更高一个阶段。 对应"任务"就是:审批人点击【同意】,任务进入下一个阶段。
  6. 死得其所:人一辈子修己,完成自己的使命,最后归于黄土,是一种圆满。 对应"任务"就是:从"RUNNING"状态进入"COMPLETED"状态,光荣完成自己的使命。

名词解释

审批任务

需要人进行审批的任务,以下简称"任务"
image.png

任务的状态

 /**
     * 非激活任务
     */
    NEW,
    /**
     * 待办任务
     */
    RUNNING,
    /**
     * 暂停任务
     */
    PAUSED,
    /**
     * 取消/撤销
     */
    CANCELED,
    /**
     * 完成
     */
    COMPLETED,
    
    /**
     * 转交
     */
    REDIRECTED

问题:

  1. 转交之后原任务的状态是什么?

TaskStatus.CANCELED

审批消息

image.png

MQ消息类型(此处指业务类型)说明

(只列举部分)

消息种类 说明 备注
PROC_INST_START 流程实例启动(发起流程实例)
TASK_BATCH_ACTIVATED 任务激活
TASK_COMPLETED 任务完成
TASK_CANCEL 任务取消
TASK_REDIRECTED 任务转交
PROCESS_INSTANCE_TERMINATE 流程实例终止(比如申请人撤销)
PROC_INST_FINISH 流程实例完成
PROCESS_INSTANCE_PAUSED 流程实例暂停

生命周期

01 生成流程实例

这是任务产生的摇篮,没有流程实例,就不可能有任务。
流程实例和任务是什么关系呢?一般一个流程实例有多个审批任务
image.png

什么时候会生成流程实例呢?
一般由业务活动触发
image.png

比如上图,在采购招投标 的过程中,有很多业务活动,标号(1)到(4)的业务活动都会触发流程实例的生成,即发起审批流。

流程实例生成之后,一定会生成任务吗?
不一定。比如下面的流程,只有一个"自动节点":
image.png
不会生成审批任务。

必须要有人工节点(下图标号1-3),才能生成审批任务:
image.png
(图01-03)

发起实例流程

image.png
(图01-04)
下面是创建流程实例发起的调度项,然后把调度项存储到数据库,再由调度引擎进行一步的调度:

ProcessInstanceStartResumptionExtension extension = new ProcessInstanceStartResumptionExtension(processInstance.getProcessId(), this.getGroupType());
            extension.setParentProcessInstanceId(processInstance.getParentProcessInstanceId());
            extension.setActivityId(activityId);
            ResumptionDO resumption = new ResumptionDO(processInstance.getProcessInstanceId(), extension, processInstance.getAppKey());
            this.flowEngine.asyncStartFlowByResumption(resumption);

asyncStartFlowByResumption 里面做了什么呢?
其实就是把调度项插入到数据库
**this**.resumptionRepositoryManager.insert(resumption, **false**);

在调度项处理程序中调用WorkflowRunnerImpl 发起流程

public void startWorkflow(String processInstanceId, long processId) {
        WorkflowContext context = createWorkflowContext(processId, processInstanceId);
        context.getWorkflow().start(context);
    }

会从流程根节点找到第一个节点开始执行,首先找到的肯定是开始节点
image.png
(图01-05)

执行节点时,根据节点的类型,执行不同的Handler:
image.png
执行HumanActivityHandler(人工节点handle) 才能生成审批任务

发起流程实例之后,会发送一个变量更新的MQ消息:VariableChangeEvent

02 生成新任务

实例发起之后一定会生成任务吗?
只有人工节点才能生成审批任务

生成任务需要哪些准备工作?

  1. 在流程设计的时候,需要为每个人工节点 设置好业务规则(取人规则);
  2. 如果配置了条件节点,那么只有条件节点在运行态满足条件时才能走到审批节点;
  3. 发起实例,即填表(如下图)

image.png
(图01-06)

HumanActivityHandler(人工节点) 的执行逻辑

image.png

标号1:Activity activity = context.getFlowDefineModel().getActivityById(transition.getDestinationActivityId())
DefaultTransitionRouteHandler.externalExecute
标号2:ElementRuntimeManagerImpl.start
主要是检查前置拦截器,例如人工节点的前置条件
image.png
com.alibaba.flowengine.ext.workflow.impl.ElementRuntimeManagerImpl#start 中主要是校验前置规则,
实际调用handleBeforeInterceptors
image.png
其实这是开始节点,因为流程实例启动时,是从开始节点开始执行的.
image.png
其实目前activity.getActivitySetting().getBeforeInterceptors() 是预留的,还没有使用到
image.png
基本this.check(elementToStart, context)都会直接返回true

标号3:ElementRuntimeManagerImpl.enter 执行_节点进入后的行为_

标号4:AbstractActivityBehavior.enter
实际会调用BpmActivityBehavior.executeActivity
然后根据activity.getActivitySetting().getIdentifyKey()(HUMAN) 获取到对应的handler,
h获取到HumanActivityHandler
标号5:获取节点实例并执行
BpmsActivityEventBuilder.createActivityStartEvent 的任务:
发送节点实例启动ActivityInstanceStartEvent的 MQ消息
同时执行HumanActivityHandler.perform(activityContext)

标号6:HumanActivityHandler.perform();
image.png
执行节点实例有两种方式:
(1)同步
(2)异步:先作为调度项ResumptionDO,存储到数据库,然后由调度引擎来异步的执行

标号7:_异步调度_;
image.png
这里要注意的是:此时并没有真正干活,而是把真正要干活的任务存储到了调度项。
下面是异步的方式获取刚才存进去的调度项,然后真正执行:
(调度引擎是如何一步一步来执行调度任务(这里只执行人工节点实例)的)
image.png

说明
标号1:这里调度引擎开始调度后,真正干活了,所以叫ItemWorker;

标号2:completeResumptionSuccessed 方法名称感觉不太好,其实这里是真正干活了,即通过调度项找到具体的handler 去执行;
其中会调用 resumptionHandler.handle(resumption)

标号3:com.alibaba.workflowengine.adapter.resumption.handler.HumanStartResumptionHandler#handle 会做两件事;
(a)通过调度项里面的扩展信息ExtensionContext 获取节点id
image.png
(b)查询流程版本schema获取该人工节点的配置信息
image.png

标号4:taskSequenceManager.initAndActivateRootTaskSequenceInternational
是初始化人工节点,并激活;

标号5:;
标号6:激活任务序列;
HumanStartResumptionHandler.handle
taskSequenceManager.initAndActivateRootTaskSequenceInternational
image.png
initAndActivateRootTaskSequenceInternational 具体做了什么呢?

03 激活任务

激活任务的时机是什么时候?
一般任务在初始化之后,就会被激活

任务为什么还需要一个激活的动作、多此一举?
比如我收到一个审批任务时,我可能不太肯定,所以前加签给另外一个伙伴,那么我的原来审批任务的状态就变为(PAUSED),等前加签执行完之后,我的审批任务恢复为"激活"状态(RUNNING)

激活任务之后还需要发MQ消息

image.png

收到任务激活消息后做了什么处理?

此处是针对消息类型 为TASK_BATCH_ACTIVATED的 消息进行处理:
image.png
说明
标号1:MQ消费者 监听到消息之后进行处理;
标号2:根据消息种类,转交给具体的handler 进行业务处理;
标号3:根据消息种类,转交给具体的handler 进行业务处理;
标号4:调用 发送审批通知 的逻辑;
标号5:根据"任务激活"消息,获取新激活的任务信息,然后发送审批消息(支持公版钉钉和专有钉钉);
标号6:组装消息卡片,调用钉钉open API 发送工作通知(审批消息);
image.png

取消任务

04 审批环节

审批人收到任务之后,能做哪些事情?

如果审批人发现自己处理不了应该怎么办?
目前有两种方式:

  1. 转交:把审批任务转交给其他人处理,自己原先的任务就会结束掉

image.png

  1. 多人填表

image.png

审批任务一定需要人工审批吗?
不一定,(see https://bpms.alibaba-inc.com/processdesigner/newProcDesign?processId=15880317601)
如果我们在人工节点 设置了"自动完成规则",那么系统会自动执行(同意或拒绝)该任务
image.png

05 审批操作

当审批人点击【同意】、【已处理】或【通过】(按钮名称是可以修改的)时,任务发生了什么变化?
调用接口:executeTask
image.png

06 任务状态变化

任务的状态机是怎样的?
image.png
image.png

一个任务的状态变化会影响其他任务的状态吗?

07 为什么有任务组

任务组存在的核心目的是 支持审批节点的完成策略
image.png
上面的人工节点(审批节点) 配置了两个审批人,那么到底怎么样算审批通过呢?
是其中一个人审批通过 就算审批通过(规则1),还是所有审批人都通过才算审批通过(规则2)呢、
这就是完成策略
(以上面为例)实际会生成两个任务,这两个任务输入同一个 任务组(TaskGroup)
比如采用规则1,那么只要有一个人审批通过,那么直接把所在任务组(TaskGroup) 置为“完成",另外一个任务就会自动取消掉。

任务模型
任务的taskType为
MIDDLE_IN_ONE_BY_ONE 或 MIDDLE_IN_ALL_AT_ONCE

  • 同时分配(捞单)MIDDLE_IN_ALL_AT_ONCE

  • 逐级审批MIDDLE_IN_ONE_BY_ONE

08 任务完成

审批人点击审批页下面的【同意】、【已处理】或【通过】,如果不出意外,那么任务的状态就变为已完成.

09 激活下一个任务

执行完当前任务之后,如何激活下一个任务呢?
image.png
下面从标号14的地方说起:
image.png
com.alibaba.flowengine.ext.route.handler.impl.MultipleTransitionRouteHandler#externalExecute
image.png

  1. 找到当前节点的出线,
  2. 执行线规则;
  3. 找到出线的startElement,
  4. 按照前面的步骤执行 startElement

z执行开始节点

流程实例启动之后,第一个执行的就是开始节点实例
image.png
也是通过 com.alibaba.flowengine.ext.workflow.impl.ElementRuntimeManagerImpl#start来执行的.
详细步骤:

  1. 开始节点的结构

image.png

  1. 检查节点的前置规则(目前都为空,这套规则没有用起来),最终调用 handleBeforeInterceptors
  2. 判断能够进入节点,最终调用executeActivity 方法;
  3. 发送MQ事件:ActivityInstanceStartEvent
  4. 调用StartActivityHandler .perform,直接返回true

image.png
其实这里可以考虑用来实现流程发起的白名单机制;

  1. 判断节点此时能够直接离开(完结)--通过com.alibaba.flowengine.ext.workflow.impl.ElementRuntimeManagerImpl#leave;
  2. 实际是校验后置拦截器 ,调用方法handleAfterInterceptors,基本activitySetting.getAfterInterceptors() 都为空(此处没有用起来)
  3. 如果可以离开,则执行this.countReduction,做减数;
  4. 获取路由调度器RouteHandler 进行路由,因为当前节点是开始节点,所以获取到的路由器是线规则路由(MultipleTransitionRouteHandler)
  5. z最终会执行 MultipleTransitionRouteHandler..execute(element, context);

获取开始节点的出线:
Set<Transition> outgoingTransitions = activity.getOutTransitions()

image.png

  1. 通过检查mei每个出线的线规则,找到符合要求的出线,如果检查出线呢?通过 com.alibaba.flowengine.ext.transition.behavior.AbstractTransitionBehavior#evaluate ,elementRuntimeManager.check
  2. 判断能够进入出线 : this.elementRuntimeManager.enter(transition, context)
  3. 最终执行完路由,找到出线的出口节点transition.getDestinationActivityId(),即下一个人工节点;

image.png

  1. 然后又回到执行 com.alibaba.flowengine.ext.workflow.impl.ElementRuntimeManagerImpl#start,

此处其实是回到了步骤2,又开始下一轮循环

这里应该有一个误区,以为真个过程都是同步的.其实不是的.
上面是开始节点的执行,下面我们开始执行人工节点,其实人工节点的执行就是异步的.

人工节点的执行到底干了啥?

  1. 校验 activity.getActivitySetting().getBeforeInterceptors();
  2. 通过 activity.getActivitySetting().getIdentifyKey() 获取到具体的handler来执行,如果是人工节点,那么handler就是HumanActivityHandler;
  3. HumanActivityHandler 在执行过程中也分为异步和同步.如果是异步,则先把执行任务封装成为调度项存储到数据库;
  4. 调度项异步调度执行

思考

目前工作流的性能瓶颈在 发起流程实例, 因为大事务,频繁的锁流程实例导致并发能力不够高;
后续会优化的点有以下几点:

  1. 梳理整个流程流转/任务流转的状态机;
  2. 区分哪些是核心信息,哪些是非核心,如果是非核心信息,可以把一致性降低到最终一致性,提高性能;

后续的计划

  1. 流程实例发起白名单,可以通过人工加节点activity.getActivitySetting().getBeforeInterceptors() 来实现
  2. 和OA审批打通;
  3. ​自动节点能力抽取,复用到更大的业务场景;
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
【开发随记】【提效】工作习惯那些事系列之四——任务管理
【开发随记】【提效】工作习惯那些事系列之四——任务管理
118 0
【开发随记】【提效】工作习惯那些事系列之四——任务管理
|
设计模式 SQL 自然语言处理
系统架构师2023年案例分析考前冲刺
系统架构师2023年案例分析考前冲刺
191 0
|
设计模式 SQL 自然语言处理
系统架构师2022年案例分析考前冲刺
系统架构师2022年案例分析考前冲刺
244 0
|
测试技术
热饭的测开成果盘点第十四期:接口测试平台(二)
本期介绍的是接口测试平台的原型,区别于公众号内直播教程,原型平台的外观更古老但细节很多,可惜后台架构并不如直播版强壮。当时带了3-4位测开新手,边教边做。
热饭的测开成果盘点第十四期:接口测试平台(二)
|
监控 测试技术 BI
热饭的测开成果盘点第十三期:接口测试平台(一)
本期介绍的是接口测试平台的原型,区别于公众号内直播教程,原型平台的外观更古老但细节很多,可惜后台架构并不如直播版强壮。当时带了3-4位测开新手,边教边做。
热饭的测开成果盘点第十三期:接口测试平台(一)
|
测试技术
热饭的测开成果盘点第十五期:教程版接口测试平台
本期介绍的是接口测试平台-教程版,也就是本公众号长期连载的系列,很多同学都说想看看什么样了,今天就给大家展示下当前进度:
热饭的测开成果盘点第十五期:教程版接口测试平台
|
测试技术
热饭的测开成果盘点第二十一期:埋点自动化测试平台
本期介绍的是一款由旧移动测试平台改造的新测试平台。具体样式其实都和之前差不多,唯一不同的是增加了自动抓包和判断请求体。
热饭的测开成果盘点第二十一期:埋点自动化测试平台
|
存储 测试技术 BI
|
流计算
老板要我开发一个简单的工作流,15 次需求变更,我干到秃了。。
一天,老板找到我,说要做个简单的工作流引擎。 我查了一天啥是工作流,然后做出了如下版本:
209 0
老板要我开发一个简单的工作流,15 次需求变更,我干到秃了。。
|
机器学习/深度学习 前端开发 算法
Pipcook 团队有话说
Pipcook 1.0 发布了,来看看Pipcook开发团队对前端智能化的一些看法。
Pipcook 团队有话说