工作流Activiti框架的事务和并发!流程引擎中异步和排他操作详细解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 本篇文章介绍了在工作流的场景下项目中的事务操作和并发操作。主要内容包括异步操作和排他任务。通过这篇文章,可以了解到在工作流Activiti框架中处理事务操作的具体方式以及排他任务机制保证的在同一个流程实例中的Job不会并发执行。在阅读完这篇文章,基本上就能清楚地了解了工作流Activiti框架中的并发处理了。

事务和并发

异步操作

  • Activiti通过事务方式执行流程,可以根据需求定制
  • Activiti处理事务:

    • 如果触发了Activiti的操作(开始流程,完成任务,触发流程继续执行),activiti会推进流程,直到每个分支都进入等待状态
    • 抽象的说,会从流程图执行深度优先搜索,如果每个分支都遇到等待状态,就会返回
    • 等待状态是稍后需要执行任务,Activiti会把当前状态保存到数据库中,然后等待下一次触发
    • 触发可能来自外部,比如用户任务或接收到一个消息,也可能来自Activiti本身(定时器事件)

在这里插入图片描述
流程包含用户任务,服务任务和定时器事件
完成用户任务和校验地址是在同一个工作单元中,两者的成功和失败是原子性的.意味着如果服务任务抛出异常,要回滚当前事务,这样流程会退回到用户任务,用户任务就依然在数据库里
这就是activiti默认的行为.在(1)中应用或客户端线程完成任务.这会执行服务,流程推进,直到遇到一个等待状态,就是定时器(2),然后它会返回给调用者(3),并提交事务(如果事务是由Activiti开启的)

  • 有时需要自定义控制流程中事务的边界,把业务逻辑包裹在一起.这就需要使用异步执行:

在这里插入图片描述
完成了用户任务,生成一个发票,把发票发送给客户
生成发票不在同一个工作单元内了.如果生成发票出错不需要对用户任务进行回滚
Activiti实现的是完成用户任务(1),提交事务,返回给调用者应用.然后在后台的线程中,异步执行生成发票.
后台线程就是Activiti的Job执行器(一个线程池)周期对数据库的Job进行扫描:当到达"generate invoice"任务,为Activiti创建一个稍后执行的Job"消息",并保存到数据库.Job会被Job执行器获取并执行.也会给本地Job执行器一个提醒,告诉有一个新Job,来增加性能

  • 要想使用这个特性,要使用activiti:async="true" 扩展
<serviceTask id="service1" name="Generate Invoice" activiti:class="my.custom.Delegate" activiti:async="true" />
  • activiti:async可以使用到以下BPMN任务类型中:

    • task
    • serviceTask,
    • scriptTask
    • businessRuleTask
    • sendTask
    • receiveTask
    • userTask
    • subProcess
    • callActivity
  • 对于userTask,receiveTask和其他等待状态,异步执行的作用是让开始流程监听器运行在一个单独的线程或者事务中

排他任务

  • 从Activiti 5.9开始 ,JobExecutor能保证同一个流程实例中的Job不会并发执行
排他任务的产生背景

在这里插入图片描述

  • 一个并行网关,后面有三个服务任务,都设置为异步执行:

    • 这样会添加三个job到数据库里.一旦job进入数据库,就可以被jobExecutor执行了.JobExecutor会获取job,代理到工作线程的线程池中,在那里真正执行job
    • 就是说,使用异步执行,可以把任务分配给这个线程池(在集群环境,可能会使用多个线程池)
    • 产生一致性问题:

      • 考虑一下服务任务后的汇聚:当服务任务完成后,到达并发汇聚节点,需要决定是等待其他分支,还是继续向下执行
      • 就是说,对每个到达并行汇聚的分支,都需要判断是继续还是等待其他分支的一个或多个分支
  • 为什么会产生这样的问题:

    • 因为服务任务配置成使用异步执行,可能相关的job都在同一时间被获取,被JobExecutor分配给不同的工作线程执行
    • 结果是,三个单独的服务执行使用的事务在到达并发汇聚时可能重叠:

      • 如果出现了这个问题,这些事务是互相不可见的,其他事务同时到达了相同的并发汇聚,假设都在等待其他分支
      • 然而,每个事务都假设在等待其他分支,所以没有分支会越过并发汇聚继续执行,流程实例会一直在等待状态,无法继续执行
  • Activiti解决这个问题方式:

    • Activiti使用了乐观锁:

      • 当基于判断的数据看起来不是最新的时候 (因为其他事务可能在提交之前进行了修改,会在每个事务里增加数据库同一行的版本),这个时候,第一个提交的事务会成功,其他会因为乐观锁异常导致失败
      • 这就解决了上面流程的问题:

        • 如果多个分支同步到达并行汇聚,会假设都在登录,并增加父流程的版本号(流程实例)然后尝试提交
        • 第一个分支会成功提交,其他分支会因为乐观锁导致失败
        • 因为流程是被job触发的,Activiti会尝试在等待一段时间后尝试执行同一个job,这段时间可以同步网关的状态
  • Activiti乐观锁是一个很好的解决方案吗?

    • 乐观锁允许Activiti避免非一致性,确定流程不会"堵在汇聚网关": 或者所有分支都通过网关,或者数据库中的job正在尝试通过
    • 虽然这是一个对于持久性和一致性的完美解决方案,但对于上层来说不一定是期望的行为:

      • Activiti只会对同一个job重试估计次数(默认配置为3).之后,job还会在数据库里,但是不会再重试了.意味着这个操作必须手工执行job的触发
      • 如果job有非事务方面的效果,不会因为失败的事务回滚:如果“预定演唱会门票”服务没有与Activiti共享事务,重试job可能导致我们预定了过多门票
  • 针对这些问题,在Activiti中推出了新的概念:排他job
排他Job
  • 对于一个流程实例,排他任务不能同时执行两个
  • 考虑上面的流程:如果我们把服务任务申请为排他任务,JobExecutor会保证对应的job不会并发执行.
  • 会保证无论什么时候获取一个流程实例的排他任务,都会把同一个流程实例的其他任务都取出来,放在同一个工作线程中执行.保证job是顺序执行的
  • 从activiti 5.9开始,排他任务已经是默认配置.所以异步执行和定时器事件默认都是排他任务
  • 如果你想把job设置为非排他,可以使用activiti:exclusive="false" 进行配置:
<serviceTask id="service" activiti:expression="${myService.performBooking(hotel, dates)}" activiti:async="true" activiti:exclusive="false" />
  • 排他任务没有性能问题:

    • 在高负载的情况下性能是个问题,高负载意味着JobExecutor的所有工作线程都一直在忙碌着
    • 使用排他任务,Activiti可以简单的分布不同的负载.排他任务意味着同一个流程实例的异步执行会由相同的线程顺序执行
    • 但是要考虑:如果有多个流程实例时.所有其他流程实例的job也会分配给其他线程同步执行
    • 意味着虽然Activiti不会同时执行一个流程实例的排他job,但是还会同步执行多个流程实例的异步执行
    • 通过一个总体的预测,在大多数场景下,排他任务都会让单独的实例运行的更迅速.而且,对于同一流程实例中的job,需要用到的数据也会利用执行的集群节点的缓存.如果任务没有在同一个节点执行,数据就必须每次从数据库重新读取了

流程实例授权

  • 默认所有人在部署的流程定义上启动一个新流程实例,通过流程初始化授权功能定义的用户和组,web客户端可以限制哪些用户可以启动一个新流程实例
  • Activiti引擎不会校验授权定义: 这个功能只是为减轻web客户端开发者实现校验规则的难度
  • 设置方法与用户任务用户分配类似,用户或组可以使用activiti:potentialStarter标签分配为流程的默认启动者:
 <process id="potentialStarter">
     <extensionElements>
       <activiti:potentialStarter>
         <resourceAssignmentExpression>
           <formalExpression>group2, group(group3), user(user3)</formalExpression>
         </resourceAssignmentExpression>
       </activiti:potentialStarter>
     </extensionElements>
   <startEvent id="theStart"/>
   ...

user(user3)是直接引用了用户user3,group(group3)是引用了组group3.如果没显示设置,默为群组

  • 也可以使用process标签的属性activiti:candidateStarterUsersactiviti:candidateStarterGroups
 <process id="potentialStarter" activiti:candidateStarterUsers="user1, user2"
                                activiti:candidateStarterGroups="group1">
      ...
  

可以同时使用这两个属性

  • 定义流程初始化授权后,开发者可以使用如下方法获得授权定义.可以获得给定的用户能够启动哪些流程定义:
 processDefinitions = repositoryService.createProcessDefinitionQuery().startableByUser("userxxx").list();
  • 可以获得指定流程定义设置的潜在启动者对应的IdentityLink:
 identityLinks = repositoryService.getIdentityLinksForProcessDefinition("processDefinitionId"); 
  • 获得可以启动给定流程的用户列表的示例:
  List<User> authorizedUsers =  identityService().createUserQuery().potentialStarter("processDefinitionId").list(); 
  • 获得可以启动给定流程配置的群组的示例:
 List<Group> authorizedGroups =  identityService().createGroupQuery().potentialStarter("processDefinitionId").list();  

数据对象

  • BPMN提供了一种功能,可以在流程定义或子流程中定义数据对象
  • 根据BPMN规范,流程定义可以包含复杂XML结构,可以导入XSD定义
  • 对于Activiti来说 ,作为Activiti首次支持的数据对象, 可以支持如下的XSD类型
    <dataObject id="dObj1" name="StringTest" itemSubjectRef="xsd:string"/>
    <dataObject id="dObj2" name="BooleanTest" itemSubjectRef="xsd:boolean"/>
    <dataObject id="dObj3" name="DateTest" itemSubjectRef="xsd:datetime"/>
    <dataObject id="dObj4" name="DoubleTest" itemSubjectRef="xsd:double"/>
    <dataObject id="dObj5" name="IntegerTest" itemSubjectRef="xsd:int"/>
    <dataObject id="dObj6" name="LongTest" itemSubjectRef="xsd:long"/>
  • 数据对象定义会自动转换为流程变量,名称与name属性对应
  • 除了数据对象的定义之外,Activiti支持使用扩展元素来为这个变量赋予默认值:
<process id="dataObjectScope" name="Data Object Scope" isExecutable="true">
          <dataObject id="dObj123" name="StringTest123" itemSubjectRef="xsd:string">
            <extensionElements>
              <activiti:value>Testing123</activiti:value>
            </extensionElements>
          </dataObject>
相关文章
|
24天前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
110 0
|
15天前
|
存储 Java
深入探讨了Java集合框架中的HashSet和TreeSet,解析了两者在元素存储上的无序与有序特性。
【10月更文挑战第16天】本文深入探讨了Java集合框架中的HashSet和TreeSet,解析了两者在元素存储上的无序与有序特性。HashSet基于哈希表实现,添加元素时根据哈希值分布,遍历时顺序不可预测;而TreeSet利用红黑树结构,按自然顺序或自定义顺序存储元素,确保遍历时有序输出。文章还提供了示例代码,帮助读者更好地理解这两种集合类型的使用场景和内部机制。
33 3
|
22天前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
82 3
|
8天前
|
存储 关系型数据库 MySQL
MySQL MVCC深度解析:掌握并发控制的艺术
【10月更文挑战第23天】 在数据库领域,MVCC(Multi-Version Concurrency Control,多版本并发控制)是一种重要的并发控制机制,它允许多个事务并发执行而不产生冲突。MySQL作为广泛使用的数据库系统,其InnoDB存储引擎就采用了MVCC来处理事务。本文将深入探讨MySQL中的MVCC机制,帮助你在面试中自信应对相关问题。
19 3
|
28天前
|
人工智能 缓存 Java
深入解析Spring AI框架:在Java应用中实现智能化交互的关键
【10月更文挑战第12天】Spring AI 是 Spring 框架家族的新成员,旨在满足 Java 应用程序对人工智能集成的需求。它支持自然语言处理、图像识别等多种 AI 技术,并提供与云服务(如 OpenAI、Azure Cognitive Services)及本地模型的无缝集成。通过简单的配置和编码,开发者可轻松实现 AI 功能,同时应对模型切换、数据安全及性能优化等挑战。
|
18天前
|
分布式计算 Java 应用服务中间件
NettyIO框架的深度技术解析与实战
【10月更文挑战第13天】Netty是一个异步事件驱动的网络应用程序框架,由JBOSS提供,现已成为Github上的独立项目。
35 0
|
24天前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
58 0
|
24天前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
49 0
|
24天前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
55 0
|
24天前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
69 0

热门文章

最新文章

推荐镜像

更多