Activiti历史数据和流程引擎管理

Activiti在删除流程定义、流程任务和流程实例时,均会将被删除的数据保存到历史数据表中,除此之外,已经完成的流程实例以及相关数据,同样会被保存到历史数据表中,如果需要对这部分数据进行管理,则可以使用HistoryService对象。HistoryService对象中提供了查询历史数据与删除历史数据的方法。

除HistoryService之外,Activiti中还有一个ManagementService业务组件。该业务组件提供维护流程引擎的API,使用ManagementService可以查询流程引擎数据库信息、工作信息,甚至可以使用它提供的方法直接对数据库进行操作。

1、历史数据管理

在Activiti中专门使用了一套数据表来保存历史数据,这些表的命名规则为ACTHⅡXXX,例如历史附件数据会保存在ACT_HI_ATTACHMENT表中,历史任务数据会保存在ACT_HI_TASKINST表中。HistoryService只提供了历史数据查询和删除的API,另外,在初始化流程引擎的时候,Activiti会根据不同的history属性值来记录相应操作的历史数据。下面将讲解如何使用HistoryService的方法对历史数据进行查询和删除。

1.1、历史流程实例查询

使用HistoryService的createHistoricProcessInstanceQuery方法可以得到HistoricProcessInstanceQuery对象,该对象主要用于流程实例的历史数据查询,流程实例的历史数据保存在
ACT_HI_PROCINST表中,不管流程是否完成,只要创建了流程实例(启动流程),流程实例的数据(历史数据)均会被保存到ACT_HI_PROCINST表中。

历史流程实例数据查询与Activiti的其他数据查询类似,设置查询条件,然后使用list和singleResult方法返回结果。

<process id="testProcess" name="testProcess">
	<startEvent id="startevent1" name="Start"></startEvent>
	<userTask id="usertask1" name="First Task"></userTask>
	<userTask id="usertask2" name="End Task"></userTask>
	<endEvent id="endevent1" name="End"></endEvent>
	<sequenceFlow id="flow1" name="" sourceRef="startevent1" targetRef="usertask1"></sequenceFlow>
	<sequenceFlow id="flow2" name="" sourceRef="usertask1" targetRef="usertask2"></sequenceFlow>
	<sequenceFlow id="flow3" name="" sourceRef="usertask2" targetRef="endevent1"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
// 得到历史服务组件
HistoryService historyService = engine.getHistoryService();
TaskService taskService = engine.getTaskService();
// 部署流程文件
Deployment deploy = repositoryService.createDeployment().addClasspathResource("demo10/ProcessInstanceQuery.bpmn").deploy();
// 查询流程定义
ProcessDefinition define = repositoryService.createProcessDefinitionQuery().deploymentId(deploy.getId()).singleResult();
// 开始流程
ProcessInstance pi1 = runtimeService.startProcessInstanceByKey("testProcess", "businessKey1");
ProcessInstance pi2 = runtimeService.startProcessInstanceByKey("testProcess", "businessKey2");
// 完成第一条流程
Task task = taskService.createTaskQuery().processInstanceId(pi1.getId()).singleResult();
taskService.complete(task.getId());
task = taskService.createTaskQuery().processInstanceId(pi1.getId()).singleResult();
taskService.complete(task.getId());
// 查询已完成的流程
List<HistoricProcessInstance> datas = historyService.createHistoricProcessInstanceQuery().finished().list();
System.out.println("使用finished方法:" + datas.size());
// 根据流程定义ID查询
datas = historyService.createHistoricProcessInstanceQuery().processDefinitionId(define.getId()).list();
System.out.println("使用processDefinitionId方法: " + datas.size());
// 根据流程定义key(流程描述文件的process节点id属性)查询
datas = historyService.createHistoricProcessInstanceQuery().processDefinitionKey(define.getKey()).list();
System.out.println("使用processDefinitionKey方法: " + datas.size());
// 根据业务主键查询
datas = historyService.createHistoricProcessInstanceQuery().processInstanceBusinessKey("businessKey1").list();
System.out.println("使用processInstanceBusinessKey方法: " + datas.size());
// 根据流程实例ID查询
datas = historyService.createHistoricProcessInstanceQuery().processInstanceId(pi1.getId()).list();
System.out.println("使用processInstanceId方法: " + datas.size());
// 查询没有完成的流程实例
historyService.createHistoricProcessInstanceQuery().unfinished().list();
System.out.println("使用unfinished方法: " + datas.size());
使用finished方法:1
使用processDefinitionId方法: 2
使用processDefinitionKey方法: 2
使用processInstanceBusinessKey方法: 1
使用processInstanceId方法: 1
使用unfinished方法: 1

1.2、历史任务查询

与查询历史流程实例类似,使用HistoryService的createHistoricTaskInstanceQuery方法可以得到HistoricTaskInstanceQuery实例,与其他的Query对象一样,该对象提供设置查询条件和排序的方法,到历史任务数据表(ACTHⅡTASKINST)中查询符合条件的数据。使用Iist
或者singleResult方法返回的是多个或者一个HistoricTaskInstance实例。

这里有一个业务流程,流程中只有两个用户任务,对应的流程图如图所示:

在这里插入图片描述

<process id="testProcess" name="testProcess2">
	<startEvent id="startevent1" name="Start"></startEvent>
	<userTask id="usertask1" name="First Task" activiti:dueDate="${varDate1}"
		activiti:assignee="angus"></userTask>
	<userTask id="usertask2" name="End Task" activiti:dueDate="${varDate2}"
		activiti:assignee="crazyit"></userTask>
	<endEvent id="endevent1" name="End"></endEvent>
	<sequenceFlow id="flow1" name="" sourceRef="startevent1"
		targetRef="usertask1"></sequenceFlow>
	<sequenceFlow id="flow2" name="" sourceRef="usertask1"
		targetRef="usertask2"></sequenceFlow>
	<sequenceFlow id="flow3" name="" sourceRef="usertask2"
		targetRef="endevent1"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
HistoryService historyService = engine.getHistoryService();
TaskService taskService = engine.getTaskService();
Deployment deploy = repositoryService.createDeployment().addClasspathResource("demo10/TaskQuery.bpmn").deploy();
ProcessDefinition define = repositoryService.createProcessDefinitionQuery().deploymentId(deploy.getId()).singleResult();
// 初始化参数
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Map<String, Object> vars = new HashMap<String, Object>();
vars.put("varDate1", sdf.parseObject("2020-10-10 06:00:00"));
vars.put("varDate2", sdf.parseObject("2021-10-10 06:00:00"));
//开始流程
ProcessInstance pi1 = runtimeService.startProcessInstanceByKey("testProcess", "businessKey1", vars);
ProcessInstance pi2 = runtimeService.startProcessInstanceByKey("testProcess", "businessKey2", vars);
//完成流程1
Task task = taskService.createTaskQuery().processInstanceId(pi1.getId()).singleResult();
taskService.complete(task.getId());
task = taskService.createTaskQuery().processInstanceId(pi1.getId()).singleResult();
taskService.complete(task.getId());
// 流程2完成一个任务
task = taskService.createTaskQuery().processInstanceId(pi2.getId()).singleResult();
taskService.complete(task.getId());
//历史数据查询
List<HistoricTaskInstance> datas = historyService.createHistoricTaskInstanceQuery().finished().list();
System.out.println("使用finished方法查询:" + datas.size());//结果3
datas = historyService.createHistoricTaskInstanceQuery().processDefinitionId(define.getId()).list();
System.out.println("使用processDefinitionId方法查询:" + datas.size());//结果4
datas = historyService.createHistoricTaskInstanceQuery().processDefinitionKey("testProcess").list();
System.out.println("使用processDefinitionKey方法查询:" + datas.size());//结果4
datas = historyService.createHistoricTaskInstanceQuery().processDefinitionName("testProcess2").list();
System.out.println("使用processDefinitionName方法查询:" + datas.size());//结果4
datas = historyService.createHistoricTaskInstanceQuery().processFinished().list();
System.out.println("使用processFinished方法查询:" + datas.size());//结果2
datas = historyService.createHistoricTaskInstanceQuery().processInstanceId(pi2.getId()).list();
System.out.println("使用processInstanceId方法查询:" + datas.size());//结果2
datas = historyService.createHistoricTaskInstanceQuery().processUnfinished().list();
System.out.println("使用processUnfinished方法查询:" + datas.size());//结果2
datas = historyService.createHistoricTaskInstanceQuery().taskAssignee("crazyit").list();
System.out.println("使用taskAssignee方法查询:" + datas.size());//结果2
datas = historyService.createHistoricTaskInstanceQuery().taskAssigneeLike("%zy%").list();
System.out.println("使用taskAssigneeLike方法查询:" + datas.size());//结果2
datas = historyService.createHistoricTaskInstanceQuery().taskDefinitionKey("usertask1").list();
System.out.println("使用taskDefinitionKey方法查询:" + datas.size());//结果2
datas = historyService.createHistoricTaskInstanceQuery().taskDueAfter(sdf.parse("2020-10-11 06:00:00")).list();
System.out.println("使用taskDueAfter方法查询:" + datas.size());//结果2
datas = historyService.createHistoricTaskInstanceQuery().taskDueBefore(sdf.parse("2022-10-11 06:00:00")).list();
System.out.println("使用taskDueBefore方法查询:" + datas.size());//结果4
datas = historyService.createHistoricTaskInstanceQuery().taskDueDate(sdf.parse("2020-10-11 06:00:00")).list();
System.out.println("使用taskDueDate方法查询:" + datas.size());//结果0
datas = historyService.createHistoricTaskInstanceQuery().unfinished().list();
System.out.println("使用unfinished方法查询:" + datas.size());//结果1
使用finished方法查询:3
使用processDefinitionId方法查询:4
使用processDefinitionKey方法查询:4
使用processDefinitionName方法查询:4
使用processFinished方法查询:2
使用processInstanceId方法查询:2
使用processUnfinished方法查询:2
使用taskAssignee方法查询:2
使用taskAssigneeLike方法查询:2
使用taskDefinitionKey方法查询:2
使用taskDueAfter方法查询:2
使用taskDueBefore方法查询:4
使用taskDueDate方法查询:0
使用unfinished方法查询:1

1.3、历史行为查询

流程的行为数据记录在历史行为表(ACT_HI_ACTINST)中,当流程进行到一个节点时,该数据表会记录流程节点的信息,包括节点的d、节点名称、节点类型和操作时间等。使用HistoryService的createHistoricActivityInstanceQuery方法可以获取HistoricActivityInstanceQuery的查询实例,使用list或者singleResult方法返回多个或者一个HistoricActivityInstance实例。

现有一个流程,其中有一个用户任务和一个信号Catching事件,图为该流程的流程图:

在这里插入图片描述

<signal id="mySignal" name="mySignal"></signal>
<process id="testProcess" name="testProcess">
	<startEvent id="startevent1" name="Start"></startEvent>
	<userTask id="usertask1" name="First Task" activiti:assignee="crazyit"></userTask>
	<intermediateCatchEvent id="signalintermediatecatchevent1"
		name="SignalCatchEvent">
		<signalEventDefinition signalRef="mySignal"></signalEventDefinition>
	</intermediateCatchEvent>
	<endEvent id="endevent1" name="End"></endEvent>
	<sequenceFlow id="flow1" name="" sourceRef="startevent1"
		targetRef="usertask1"></sequenceFlow>
	<sequenceFlow id="flow2" name="" sourceRef="usertask1"
		targetRef="signalintermediatecatchevent1"></sequenceFlow>
	<sequenceFlow id="flow3" name=""
		sourceRef="signalintermediatecatchevent1" targetRef="endevent1"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
HistoryService historyService = engine.getHistoryService();
TaskService taskService = engine.getTaskService();
repositoryService.createDeployment().addClasspathResource("demo10/ActivityQuery.bpmn").deploy();
// 开始两条流程
ProcessInstance pi1 = runtimeService.startProcessInstanceByKey("testProcess");
ProcessInstance pi2 = runtimeService.startProcessInstanceByKey("testProcess");
// 完成第一个流程
Task task = taskService.createTaskQuery().processInstanceId(pi1.getId()).singleResult();
taskService.complete(task.getId());
runtimeService.signalEventReceived("mySignal");
// 第二个流程实例完成第一个任务
task = taskService.createTaskQuery().processInstanceId(pi2.getId()).singleResult();
taskService.complete(task.getId());
//查询数据
List<HistoricActivityInstance> datas = historyService.createHistoricActivityInstanceQuery().activityId("endevent1").list();
System.out.println("使用activityId查询:" + datas.size());//结果1
datas = historyService.createHistoricActivityInstanceQuery().activityInstanceId(datas.get(0).getId()).list();
System.out.println("使用activityInstanceId查询:" + datas.size());//结果1
datas = historyService.createHistoricActivityInstanceQuery().activityType("intermediateCatchEvent").list();
System.out.println("使用activityType查询:" + datas.size());//结果2
datas = historyService.createHistoricActivityInstanceQuery().finished().list();
System.out.println("使用finished查询:" + datas.size());//结果6
datas = historyService.createHistoricActivityInstanceQuery().processInstanceId(pi2.getId()).list();
System.out.println("使用processInstanceId查询:" + datas.size());//结果3
datas = historyService.createHistoricActivityInstanceQuery().taskAssignee("crazyit").list();
System.out.println("使用taskAssignee查询:" + datas.size());//结果2
datas = historyService.createHistoricActivityInstanceQuery().unfinished().list();
System.out.println("使用unfinished查询:" + datas.size());//结果1
使用activityId查询:1
使用activityInstanceId查询:1
使用activityType查询:2
使用finished查询:6
使用processInstanceId查询:3
使用taskAssignee查询:2
使用unfinished查询:1

1.4、历史流程明细查询

流程的明细数据包括流程参数和流程表单属性,在流程进行的过程中,会产生相当多的明细数据,例如在流程执行过程中设置参数、改变参数值等操作,这些数据会被保存到历史明细表(ACT_HI_DETAIL)中。在默认情况下,Activiti不记录这些“过程”数据,当history配置为full级别时,才会记录明细数据。

流程中的参数除了会被保存到历史明细表中外,还会被保存到历史参数表(ACT_HI_VARINST)中,需要注意的是,历史参数表中保存的是最终的参数值,而历史明细表中则会
保存参数的改变过程。

<process id="testProcess" name="testProcess" isExecutable="true">
  <startEvent id="startevent1" name="Start"></startEvent>
  <userTask id="usertask1" name="First Task"></userTask>
  <userTask id="usertask2" name="End Task"></userTask>
  <endEvent id="endevent1" name="End"></endEvent>
  <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="usertask1"></sequenceFlow>
  <sequenceFlow id="flow3" sourceRef="usertask2" targetRef="endevent1"></sequenceFlow>
  <userTask id="usertask3" name="Second Task"></userTask>
  <sequenceFlow id="flow4" sourceRef="usertask1" targetRef="usertask3"></sequenceFlow>
  <sequenceFlow id="flow5" sourceRef="usertask3" targetRef="usertask2"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
HistoryService historyService = engine.getHistoryService();
TaskService taskService = engine.getTaskService();
repositoryService.createDeployment().addClasspathResource("demo10/DetailQuery.bpmn").deploy();
// 初始化参数
Map<String, Object> vars = new HashMap<String, Object>();
vars.put("days", 1);
// 启动流程
ProcessInstance pi = runtimeService.startProcessInstanceByKey("testProcess", vars);
// 完成第一个任务
Task task = taskService.createTaskQuery().processInstanceId(pi.getId()).singleResult();
vars.put("days", 2); // 设置参数为2
taskService.complete(task.getId(), vars);
// 完成第二个任务
task = taskService.createTaskQuery().processInstanceId(pi.getId()).singleResult();
vars.put("days", 3); // 设置参数为3
taskService.complete(task.getId(), vars);
// 查询明细总数
List<HistoricDetail> datas = historyService.createHistoricDetailQuery().processInstanceId(pi.getId()).list();
System.out.println("设置三次参数后,历史明细表数据:" + datas.size());

// 查询参数表
List<HistoricVariableInstance> hisVars = historyService
        .createHistoricVariableInstanceQuery()
        .processInstanceId(pi.getId()).list();
System.out.println("参数表数据量:" + hisVars.size());
System.out.println("参数最后的值为:" + hisVars.get(0).getValue());
设置三次参数后,历史明细表数据:3
参数表数据量:1
参数最后的值为:3

1.5、删除历史流程实例和历史任务

删除历史流程实例与历史任务,会将与其关联的数据一并删除,例如参数、明细、行为等数据。在删除时需要注意的是,如果一个流程实例没有完成,在调用删除方法时,将会抛出异常。

<process id="testProcess" name="testProcess" isExecutable="true">
  <startEvent id="startevent1" name="Start"></startEvent>
  <userTask id="usertask1" name="First Task"></userTask>
  <userTask id="usertask2" name="End Task"></userTask>
  <endEvent id="endevent1" name="End"></endEvent>
  <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="usertask1"></sequenceFlow>
  <sequenceFlow id="flow3" sourceRef="usertask2" targetRef="endevent1"></sequenceFlow>
  <userTask id="usertask3" name="Second Task"></userTask>
  <sequenceFlow id="flow4" sourceRef="usertask1" targetRef="usertask3"></sequenceFlow>
  <sequenceFlow id="flow5" sourceRef="usertask3" targetRef="usertask2"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
HistoryService historyService = engine.getHistoryService();
TaskService taskService = engine.getTaskService();
repositoryService.createDeployment().addClasspathResource("demo10/Delete.bpmn").deploy();
// 开始流程
ProcessInstance pi1 = runtimeService.startProcessInstanceByKey("testProcess");
ProcessInstance pi2 = runtimeService.startProcessInstanceByKey("testProcess");
// 完成第一个流程实例
Task task = taskService.createTaskQuery().processInstanceId(pi1.getId()).singleResult();
taskService.setVariableLocal(task.getId(), "name", "crazyit");
taskService.complete(task.getId());
task = taskService.createTaskQuery().processInstanceId(pi1.getId()).singleResult();
taskService.complete(task.getId());
// 第二个流程实例完成第一个节点
task = taskService.createTaskQuery().processInstanceId(pi2.getId()).singleResult();
taskService.complete(task.getId());
System.out.println("删除前任务数量:" + historyService.createHistoricTaskInstanceQuery().count());
// 删除第二个流程实例的历史任务数据
historyService.deleteHistoricTaskInstance(task.getId());
System.out.println("删除后任务数量:" + historyService.createHistoricTaskInstanceQuery().count());
System.out.println("删除前流程实例数量:" + historyService.createHistoricProcessInstanceQuery().count());
// 删除第一个流程实例的历史流程数据
historyService.deleteHistoricProcessInstance(pi1.getId());
// 抛出错误,删除没有完成的流程实例历史数据
historyService.deleteHistoricProcessInstance(pi2.getId());
System.out.println("删除后流程实例数量:" + historyService.createHistoricProcessInstanceQuery().count());

2、工作的产生

在日常业务中,不可避免地会产生异步工作,对比,Activiti提供了工作表来保存任务。相对于Activiti5,Activiti6重构了工作执行器这一模块的逻辑,任务被归类到四个表中,例如一般的工作会被保存在一般工作表(ACT_RU_JOB)中,而流程定义与流程实例的定时中断任务会被保存到ACT_RU_SUSPENDED_JOB表中,定时任务则会被保存到
ACT_RU_TIMER_JOB表中,当满足一定条件时,这些工作将会被执行。下面将讲述一般工作的产生,让大家对工作有个初步的了解。

2.1、异步任务产生的工作

Activiti中提供了多种类型的Task,例如User Task、Script Task、Java Service Task等,Java Service Task主要用于在任务中调用外部的Java类,以下为Java Service Task在流程描述文件中的XML配置:

<serviceTask id="javaService"name="My Java Service Task" activiti:class="org.activiti.MyJavaDelegate"/>

以上的配置片断中定义了一个serviceTask,并且为该serviceTask设置了对应的Delegate(activiti:class属性)。定义一个serviceTask,可以有多种途径去指定如何使用Java类的逻辑,以上的配置片断为其中一种,这里的目的主要是介绍如何使用异步的serviceTask产生工作。以上的配置只是简单地定义一个serviceTask,如果需要定义异步的serviceTask,则需要为serviceTask节点加入activiti:async属性,并将该属性值设置为true。

<process id="async-continuation" name="async-continuation">
	<startEvent id="startevent1" name="Start"></startEvent>
	<serviceTask id="servicetask1" name="Service Task"
		activiti:async="true" 
		activiti:class="pers.zhang.delegate.MyJavaDelegate"></serviceTask>
	<endEvent id="endevent1" name="End"></endEvent>
	<userTask id="usertask1" name="Task1"></userTask>
	<sequenceFlow id="flow2" name="" sourceRef="servicetask1"
		targetRef="usertask1"></sequenceFlow>
	<sequenceFlow id="flow3" name="" sourceRef="usertask1"
		targetRef="endevent1"></sequenceFlow>
	<sequenceFlow id="flow4" name="" sourceRef="startevent1"
		targetRef="servicetask1"></sequenceFlow>
</process>
public class MyJavaDelegate implements JavaDelegate {
    public void execute(DelegateExecution execution) {
        System.out.println("This is java delegate");
    }
}
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
ManagementService managementService = engine.getManagementService();
RuntimeService runtimeService = engine.getRuntimeService();
repositoryService.createDeployment().addClasspathResource("demo10/async-continuation.bpmn").deploy();
// 产生由async-continuation处理的工作
ProcessInstance pi1 = runtimeService.startProcessInstanceByKey("async-continuation");
// 查询工作数量:1
System.out.println("工作数量:" + managementService.createJobQuery().count());

打开数据库的ACT_RU_JOB表,可以看到类型为message(TYPE字段)的工作产生,并且该工作数据的HANDLER TYPE字段值为“async-continuation”,
HANDLER TYPE字段主要用于标识该工作数据应该是哪个处理类型,“async-continuation”对应的工作处理类为AsyncContinuationJobHandler。

2.2、定时中间事件产生的工作

如果在流程中定义了定时器相关的节点,那么这些节点不会被马上执行,有可能会在某个定义的时间点上执行,这些节点会被转化为工作保存到定时器工作表(ACT_RU_TIMER_JOB)中,流程引擎会定时查询该表的数据,然后将符合执行条件(时间条件)的工作取出来交由线程池执行。在流程中可以加入定时器的节点有中间事件节点、流程开始事件节点和边界事件节点,这里所讲述的是定时中间事件(Timer Intermediate Catching Event).所产生的工作。

<process id="timer-intermediate-transition" name="timer-intermediate-transition">
	<endEvent id="endevent1" name="End"></endEvent>
	<userTask id="usertask1" name="User Task"></userTask>
	<sequenceFlow id="flow3" name="" sourceRef="usertask1"
		targetRef="endevent1"></sequenceFlow>
	<startEvent id="startevent1" name="Start"></startEvent>
	<intermediateCatchEvent id="timerintermediatecatchevent1"
		name="TimerCatchEvent">
		<timerEventDefinition>
			<timeDuration>PT1M</timeDuration>
		</timerEventDefinition>
	</intermediateCatchEvent>
	<sequenceFlow id="flow4" name="" sourceRef="startevent1"
		targetRef="timerintermediatecatchevent1"></sequenceFlow>
	<sequenceFlow id="flow5" name=""
		sourceRef="timerintermediatecatchevent1" targetRef="usertask1"></sequenceFlow>
</process>

代码定义了一个intermediateCatchEvent中间事件,并且在该事件下加入了定时事件的定义,此时该中间事件成为了一个定时中间事件。在定义定时器事件的时候,设置了timeDuration元素,值为PTlM,表示定时器将会在1分钟后触发,即1分钟后会跳过该流程节点,执行下一个节点(此处为UserTask)。

ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
repositoryService.createDeployment().addClasspathResource("demo10/timer-intermediate-transition.bpmn").deploy();
// 开始流程
runtimeService.startProcessInstanceByKey("timer-intermediate-transition");
// 查询工作数量
System.out.println("工作数量:" + engine.getManagementService().createTimerJobQuery().count());

在这里插入图片描述

2.3、定时边界事件产生的工作

与定时中间事件类似,定时边界事件(Timer Boundary Event).同样会产生定时工作。
在这里插入图片描述

<process id="timer-transition" name="process1">
	<userTask id="usertask1" name="Task1"></userTask>
	<boundaryEvent id="boundarytimer1" cancelActivity="false"
		attachedToRef="usertask1">
		<timerEventDefinition>
			<timeDuration>PT1M</timeDuration>
		</timerEventDefinition>
	</boundaryEvent>
	<endEvent id="endevent1" name="End"></endEvent>
	<sequenceFlow id="flow2" name="" sourceRef="usertask1"
		targetRef="endevent1"></sequenceFlow>
	<startEvent id="startevent1" name="Start"></startEvent>
	<sequenceFlow id="flow3" name="" sourceRef="startevent1"
		targetRef="boundarytimer1"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
repositoryService.createDeployment().addClasspathResource("demo10/timer-transition.bpmn").deploy();
// 开始流程
runtimeService.startProcessInstanceByKey("timer-transition");
// 查询工作数量
System.out.println("工作数量:" + engine.getManagementService().createTimerJobQuery().count());

2.4、定时开始事件产生的工作

在开始事件中同样可以加入定时器,让其变为定时开始事件。
在这里插入图片描述

<process id="timer-start-event" name="timer-start-event">
	<startEvent id="timerstartevent1" name="Timer start">
		<timerEventDefinition>
			<timeDuration>PT1M</timeDuration>
		</timerEventDefinition>
	</startEvent>
	<userTask id="usertask1" name="Task1"></userTask>
	<endEvent id="endevent1" name="End"></endEvent>
	<sequenceFlow id="flow1" name="" sourceRef="timerstartevent1"
		targetRef="usertask1"></sequenceFlow>
	<sequenceFlow id="flow2" name="" sourceRef="usertask1"
		targetRef="endevent1"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
repositoryService.createDeployment().addClasspathResource("demo10/timer-start-event.bpmn").deploy();
// 查询工作数量
System.out.println("工作数量:" + engine.getManagementService().createTimerJobQuery().count());

2.5、流程抛出事件产生的工作

当执行流遇到信号Catching事件时,会停留在该事件节点,一直等待信号,并且会在事件描述表(ACT_RU_EVENT_SUBSCR)中加入相应的事件描述数据。如果ACT_RU_EVENT_SUBSCR表中存在名称为“testSignal”的信号事件描述,而此时另外的流程实例(或者同一个流程实例的不同执行流)如果有一个异步的信号Throwing事件,同样使用了名称为“testSignal”的信号事件描述,那么此时将会产生工作。

在这里插入图片描述

代码定了一个信号Catching事件和一个信号Throwing事件,这两个事件均使用了相同的信号定义,并且在信号Throwing事件中,将信号定义为异步信号(activiti:async属性为true)。当流程出现分支时,会直接到达信号Catching事件,此时该执行流会一直等待信号,而另外一条分支,会先到达Task1节点,执行流经过Task1节点后,会到达信号Throwing事件,此时将会产生message类型的工作。

<signal id="testSignal" name="testSignal"></signal>
<process id="event" name="event">
	<startEvent id="startevent1" name="Start"></startEvent>
	<parallelGateway id="parallelgateway1" name="Parallel Gateway"></parallelGateway>
	<intermediateCatchEvent id="signalintermediatecatchevent1"
		name="SignalCatchEvent">
		<signalEventDefinition signalRef="testSignal"></signalEventDefinition>
	</intermediateCatchEvent>
	<userTask id="usertask1" name="User Task"></userTask>
	<endEvent id="endevent1" name="End"></endEvent>
	<parallelGateway id="parallelgateway2" name="Parallel Gateway"></parallelGateway>
	<userTask id="usertask2" name="Task1"></userTask>
	<intermediateThrowEvent id="signalintermediatethrowevent1"
		name="SignalThrowEvent" >
		<signalEventDefinition signalRef="testSignal" activiti:async="true"></signalEventDefinition>
	</intermediateThrowEvent>
	<sequenceFlow id="flow1" name="" sourceRef="startevent1"
		targetRef="parallelgateway1"></sequenceFlow>
	<sequenceFlow id="flow3" name=""
		sourceRef="signalintermediatecatchevent1" targetRef="parallelgateway2"></sequenceFlow>
	<sequenceFlow id="flow6" name="" sourceRef="parallelgateway2"
		targetRef="usertask1"></sequenceFlow>
	<sequenceFlow id="flow7" name="" sourceRef="usertask1"
		targetRef="endevent1"></sequenceFlow>
	<sequenceFlow id="flow8" name="" sourceRef="parallelgateway1"
		targetRef="usertask2"></sequenceFlow>
	<sequenceFlow id="flow9" name="" sourceRef="usertask2"
		targetRef="signalintermediatethrowevent1"></sequenceFlow>
	<sequenceFlow id="flow10" name=""
		sourceRef="signalintermediatethrowevent1" targetRef="parallelgateway2"></sequenceFlow>
	<sequenceFlow id="flow11" name="" sourceRef="parallelgateway1"
		targetRef="signalintermediatecatchevent1"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
TaskService taskService = engine.getTaskService();
// 部署流程文件
repositoryService.createDeployment().addClasspathResource("demo10/event.bpmn").deploy();
runtimeService.startProcessInstanceByKey("event");
// 将task1的工作完成后,就会产生工作
Task task = taskService.createTaskQuery().taskName("Task1").singleResult();
taskService.complete(task.getId());
// 查询工作数量
System.out.println("工作数量:" + engine.getManagementService().createJobQuery().count());

运行代码后,ACT_RU_JOB表中会产生类型为message、处理类型为“event”的工作数据,此时由于第一个流程分支会一直停在信号Catching事件中,因此事件描述表中会存在事件描述数据,当第二个流程分支遇到异步的信号Throwing事件时,会到事件描述表中查询是否存在名称为“testSignal”的事件数据,如果有,就向ACT_RU_JOB表中写入类型为message的工作数据,因此ACT_RU_JOB表的HANDLER_CFG字段值保存的是ACT_RU_EVENT_SUBSCR表中事件的ID。处理类型为“event”的工作数据会交由ProcessEventJobHandler类处理。

2.6、暂停工作的产生

流程定义与流程实例可以被中断,中断后,与这些流程定义、流程实例有关的工作,将会被保存到中断工作表(ACT_RU_SUSPENDED_JOB)中。
在这里插入图片描述

<process id="suspendJob" name="suspendJob" isExecutable="true">
  <startEvent id="startevent1" name="Start"></startEvent>
  <endEvent id="endevent1" name="End"></endEvent>
  <intermediateCatchEvent id="timerintermediatecatchevent1" name="TimerCatchEvent">
    <timerEventDefinition>
      <timeDuration>PT5M</timeDuration>
    </timerEventDefinition>
  </intermediateCatchEvent>
  <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="timerintermediatecatchevent1"></sequenceFlow>
  <sequenceFlow id="flow2" sourceRef="timerintermediatecatchevent1" targetRef="endevent1"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
ManagementService managementService = engine.getManagementService();
Deployment dep = repositoryService.createDeployment().addClasspathResource("demo10/SuspendJob.bpmn").deploy();
// 启动流程实例
ProcessInstance pi = runtimeService.startProcessInstanceByKey("suspendJob");
// 查询定时器表的数量
long timerCount = managementService.createTimerJobQuery().count();
System.out.println("中断前定时器表的数据量:" + timerCount);
// 查询中断表的数量
long suspendCount = managementService.createSuspendedJobQuery().count();
System.out.println("中断前中断表数据量:" + suspendCount);
// 中断流程实例
runtimeService.suspendProcessInstanceById(pi.getId());
// 查询定时器表的数量
timerCount = managementService.createTimerJobQuery().count();
System.out.println("中断后定时器表的数据量:" + timerCount);
// 查询中断表的数量
suspendCount = managementService.createSuspendedJobQuery().count();
System.out.println("中断后中断表数据量:" + suspendCount);
中断前定时器表的数据量:3
中断前中断表数据量:0
中断后定时器表的数据量:2
中断后中断表数据量:1

2.7、无法执行的工作

如果工作在执行时发生异常,则可以重新执行,默认情况下,工作最大执行次数为3次可以使用ManagementService的setJobRetries方法来设置重试次数。如果一个工作执行多次,仍然是失败的,那么Activiti就会将其写到ACT_RU_DEADLETTER_JOB表中,该表主要用来保存一些无法执行的工作。为了测试效果,设置一个简单流程,在其中加入一个ServiceTask,流程文件的XML如代码:

<process id="deadletter" name="deadletter" isExecutable="true">
	<endEvent id="endevent1" name="End"></endEvent>
	<startEvent id="startevent1" name="Start"></startEvent>
	<serviceTask id="servicetask1" name="Task1" activiti:async="true" 
		activiti:class="pers.zhang.delegate.JobExceptionDelegate"></serviceTask>
	<sequenceFlow id="flow3" sourceRef="servicetask1"
		targetRef="endevent1"></sequenceFlow>
	<sequenceFlow id="flow4" sourceRef="startevent1"
		targetRef="servicetask1"></sequenceFlow>
</process>
public class JobExceptionDelegate implements JavaDelegate {
    public void execute(DelegateExecution execution) {
        System.out.println("this is job exception delegate");
        throw new RuntimeException("JobExceptionDelegate message");
    }
}
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
ManagementService managementService = engine.getManagementService();
repositoryService.createDeployment().addClasspathResource("demo10/deadletter.bpmn").deploy();
// 启动流程
ProcessInstance pi = runtimeService.startProcessInstanceByKey("deadletter");
// 设置重试次数
Job job = managementService.createJobQuery().processInstanceId(pi.getProcessInstanceId()).singleResult();
managementService.setJobRetries(job.getId(), 1);
// 重新执行该工作,抛出异常
try {
    managementService.executeJob(job.getId());
} catch (Exception e) {

}
// 查询无法执行工作表
long deadCount = managementService.createDeadLetterJobQuery().count();
System.out.println("无法执行的工作,数据量:" + deadCount);

在这里插入图片描述

3、工作管理

ManagementService主要用于对流程引擎进行管理,包括工作的查询、删除、执行和数据库管理等。根据前面的讲述可知,工作会被保存到四个数据表中,因此ManagementService提供了对应的工作查询对象,这些工作查询对象最终都返回Job实例。

3.1、工作查询对象

对应存放工作的四个数据表,是以下四个工作查询对象。

  • JobQuery:到一般工作表(ACT_RU_JOB)中查询数据。
  • TimerJobQuery:到定时器工作表(ACT_RU_TIMER JOB)中查询数据。
  • SuspendedJobQuery:到中断工作表(ACT_RU_SUSPENDED_JOB)中查询数据。
  • DeadLetterJobQuery:到无法执行工作表(ACT_RU_DEADLETTER_JOB)中查询数据。

3.2、获取工作异常信息

当工作执行中出现异常时,各个工作表的EXCEPTION_STACK_ID_和EXCEPTION_MSG字段会保存工作执行的异常信息,使用ManagementService的几个getXXXJobExceptionStacktrace方法可以获取这些异常信息,该方法会根据EXCEPTION_STACK_ID字段所保存的值,到ACT_GE_BYTEARRAY表中查询相应的详细异常信息。

<process id="testMsg" name="testMsg">
	<startEvent id="timerstartevent1" name="Timer start">
	</startEvent>
	<serviceTask id="usertask1" name="Task1" activiti:async="true" 
		activiti:class="pers.zhang.delegate.JobExceptionDelegate"></serviceTask>
	<endEvent id="endevent1" name="End"></endEvent>
	<sequenceFlow id="flow1" name="" sourceRef="timerstartevent1"
		targetRef="usertask1"></sequenceFlow>
	<sequenceFlow id="flow2" name="" sourceRef="usertask1"
		targetRef="endevent1"></sequenceFlow>
</process>
ProcessEngineImpl engine = (ProcessEngineImpl) ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
repositoryService.createDeployment().addClasspathResource("demo10/jobException.bpmn").deploy();
RuntimeService runtimeService = engine.getRuntimeService();
ManagementService mService = engine.getManagementService();
// 启动流程
runtimeService.startProcessInstanceByKey("testMsg");
// 执行工作
try {
    mService.executeJob(mService.createJobQuery().singleResult().getId());
} catch (Exception e) {
}
// 查询异常信息
String msg = mService.getTimerJobExceptionStacktrace(mService.createTimerJobQuery().singleResult().getId());
System.out.println("============ 分隔线  ==============");
System.out.println(msg);

在代码中会启动一个简单流程,流程中只包含一个异步的ServiceTask,对应的处理类会抛出RuntimeException。调用ManagementService的executeJob方法执行这个流程所
产生的一般工作,由于该工作会抛出异常,因此会被迁移到定时器工作表中,等待下一次执行。

3.3、转移与删除工作

几个保存工作的数据表分别代表不同状态的工作,如果想要改变工作的状态,可以调用几个move方法来进行工作的转移。例如可以将某个工作设置为不可执行,将不可执行的工作设置为继续执行。

<process id="moveJob1" name="moveJob1">
	<startEvent id="startevent1" name="Start"></startEvent>
	<serviceTask id="servicetask1" name="Service Task"
		activiti:async="true" 
		activiti:class="org.crazyit.activiti.job.MyJavaDelegate"></serviceTask>
	<endEvent id="endevent1" name="End"></endEvent>
	<userTask id="usertask1" name="Task1"></userTask>
	<sequenceFlow id="flow2" name="" sourceRef="servicetask1"
		targetRef="usertask1"></sequenceFlow>
	<sequenceFlow id="flow3" name="" sourceRef="usertask1"
		targetRef="endevent1"></sequenceFlow>
	<sequenceFlow id="flow4" name="" sourceRef="startevent1"
		targetRef="servicetask1"></sequenceFlow>
</process>
<process id="moveJob2" name="moveJob2">
	<userTask id="usertask1" name="Task1"></userTask>
	<boundaryEvent id="boundarytimer1" cancelActivity="false"
		attachedToRef="usertask1">
		<timerEventDefinition>
			<timeDuration>PT20M</timeDuration>
		</timerEventDefinition>
	</boundaryEvent>
	<endEvent id="endevent1" name="End"></endEvent>
	<sequenceFlow id="flow2" name="" sourceRef="usertask1"
		targetRef="endevent1"></sequenceFlow>
	<startEvent id="startevent1" name="Start"></startEvent>
	<sequenceFlow id="flow3" name="" sourceRef="startevent1"
		targetRef="boundarytimer1"></sequenceFlow>
</process>
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
RepositoryService repositoryService = engine.getRepositoryService();
RuntimeService runtimeService = engine.getRuntimeService();
ManagementService mService = engine.getManagementService();
repositoryService.createDeployment().addClasspathResource("demo10/moveJob_1.bpmn").deploy();
repositoryService.createDeployment().addClasspathResource("demo10/moveJob_2.bpmn").deploy();
// 启动流程
runtimeService.startProcessInstanceByKey("moveJob1");
System.out.println("移动前一般工作数量:" + mService.createJobQuery().count()
        + ", deadletter数量:"
        + mService.createDeadLetterJobQuery().count());
// 将一般工作移动到deadletter表
mService.moveJobToDeadLetterJob(mService.createJobQuery()
        .singleResult().getId());
System.out.println("调用 moveJobToDeadLetterJob 后一般工作表数量:"
        + mService.createJobQuery().count() + ", deadletter数量:"
        + mService.createDeadLetterJobQuery().count());
// 将deadletter移动到一般工作表
mService.moveDeadLetterJobToExecutableJob(mService
        .createDeadLetterJobQuery().singleResult().getId(), 2);
System.out.println("调用 moveDeadLetterJobToExecutableJob 后一般工作表数量:"
        + mService.createJobQuery().count() + ", deadletter数量:"
        + mService.createDeadLetterJobQuery().count());
// 删除工作
mService.deleteJob(mService.createJobQuery().singleResult().getId());

// 启动第二条流程
runtimeService.startProcessInstanceByKey("moveJob2");
System.out.println("移动前工作数量:" + mService.createJobQuery().count()
        + ", 定时器工作数量:" + mService.createTimerJobQuery().count());
// 将定时器工作移动到一般工作表
mService.moveTimerToExecutableJob(mService.createTimerJobQuery().singleResult().getId());
System.out.println("调用 moveTimerToExecutableJob 后一般工作表数量:"
        + mService.createJobQuery().count() + ", 定时器工作数量:"
        + mService.createTimerJobQuery().count());
移动前一般工作数量:1, deadletter数量:0
调用 moveJobToDeadLetterJob 后一般工作表数量:0, deadletter数量:1
调用 moveDeadLetterJobToExecutableJob 后一般工作表数量:1, deadletter数量:0
移动前工作数量:0, 定时器工作数量:1
调用 moveTimerToExecutableJob 后一般工作表数量:1, 定时器工作数量:0

4、数据库管理

ManangementService除了提供操作工作的API外,还提供管理数据库的API,如果流程引擎的管理者并非技术人员,那么开发者可以利用这些API开发出数据库管理功能,让非技术人员也可以进行流程引擎管理。

4.1、查询引擎属性

Activiti会将流程引擎相关的属性配置保存到ACT GE PROPERTY表中,一些全局的、可能会发生改变的属性配置均会被放到该表中保存。使用ManagementService的getProperties方法可以返回这些属性及其值。

// 创建流程引擎
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
// 得到管理服务组件
ManagementService managementService = engine.getManagementService();
Map<String, String> props = managementService.getProperties();
//输出属性
for (String key : props.keySet()) {
    System.out.println("属性:" + key + " 值为:" + props.get(key));
}
属性:cfg.execution-related-entities-count 值为:false
属性:next.dbid 值为:2501
属性:schema.version 值为:6.0.0.4
属性:schema.history 值为:create(6.0.0.4)

4.2、数据表信息查询

如果需要知道一个数据表有哪些字段以及这些字段的类型,则可以使用ManagementService的getTableMetaData方法来获取某一个数据表的基础信息,该方法返回一个TableMetaData对象,通过该对象可以得到数据表的名称、表的全部字段名称以及各个字段的类型信息。如果需
要查询一个表的数据量,则可以使用ManagementService的getTableCount方法,该方法返回全部数据表的数据量,返回一个key为表名称、value为Long值的Map对象。

// 创建流程引擎
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
// 得到管理服务组件
ManagementService managementService = engine.getManagementService();
// 查询表信息
TableMetaData data = managementService.getTableMetaData("ACT_GE_PROPERTY");
System.out.println("输出 ACT_GE_PROPERTY 的列:");
List<String> columns = data.getColumnNames();
for (String column : columns) {
    System.out.println(column);
}
System.out.println("输出 ACT_GE_PROPERTY 的列类型:");
List<String> types = data.getColumnTypes();
for (String type : types) {
    System.out.println(type);
}
// 查询数据量
Map<String, Long> count = managementService.getTableCount();
System.out.println("ACT_GE_PROPERTY 表数据量:" + count.get("ACT_GE_PROPERTY"));
输出 ACT_GE_PROPERTY 的列:
NAME_
VALUE_
REV_
输出 ACT_GE_PROPERTY 的列类型:
VARCHAR
VARCHAR
INT
ACT_GE_PROPERTY 表数据量:4

使用getTableCount方法可以查询Activiti表的数据量,返回的Map对象的key为数据表名称,value为数据量。需要注意的是,判断是否为Activiti的数据表,标准为表名是否以“ACT”开头,如果新建一个ACT_ABC表,该方法一样会将其查询出来。

4.3、数据库操作

使用ManagementService的databaseSchemaUpgrade方法可以实现对数据库的原始操作,这些操作可以由调用人自己决定,可以创建schema、创建表、删除schema和删除表等,调用该方法需要提供Connection、数据库catalog和数据库schema参数,其中可以使用Connection的createStatement方法得到Statement对象,然后可以执行各种SQL语句。

// 创建流程引擎
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
// 得到管理服务组件
ManagementService managementService = engine.getManagementService();
// 创建数据库连接
String url = "jdbc:mysql://localhost:3306/activiti";
String userName = "root";
String passwd = "123456";
Connection conn = DriverManager.getConnection(url, userName, passwd);
// 创建schema
conn.createStatement().execute("create database 10_TEST");
managementService.databaseSchemaUpgrade(conn, "", "");
// 删除schema
conn = DriverManager.getConnection(url, userName, passwd);
conn.createStatement().execute("drop database 10_TEST");
managementService.databaseSchemaUpgrade(conn, "", "");
// 为10数据库创建TABLE_1表
conn = DriverManager.getConnection(url, userName, passwd);
conn.createStatement()
        .execute(
                "create TABLE TABLE_1(`ID` int(11) NOT NULL auto_increment, PRIMARY KEY  (`ID`))");
managementService.databaseSchemaUpgrade(conn, "", "");
// 删除10的TABLE_1表
conn = DriverManager.getConnection(url, userName, passwd);
conn.createStatement().execute("drop table TABLE_1");
managementService.databaseSchemaUpgrade(conn, "", "");

4.4、数据表查询

对于一些非Activiti自带的数据表,如果想对其进行数据查询,则可以使用ManagementService的createTablePageQuery方法创建一个TablePageQuery对象,使用该对象的tableName方法可以设置到哪个表进行数据查询,再使用listPage方法返回一个TablePage对象,使用listPage方法时需要传入查询的开始索引值和查询的最大数据量:

// 创建流程引擎
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
// 得到管理服务组件
ManagementService managementService = engine.getManagementService();
// 查询ACT_GE_PROPERTY表的数据
TablePage page = managementService.createTablePageQuery()
        .tableName("ACT_GE_PROPERTY").listPage(0, 2);
List<Map<String, Object>> datas = page.getRows();
for (Map<String, Object> data : datas) {
    System.out.println("============");
    for (String key : data.keySet()) {
        System.out.println(key + "---" + data.get(key));
    }
}
============
VALUE_---false
REV_---1
NAME_---cfg.execution-related-entities-count
============
VALUE_---2501
REV_---2
NAME_---next.dbid
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐