写一个简单的工作流(四)资源的处理
2023-03-14 10:24:42 时间
昨天晚上搞到深夜,终于将资源模块搞定。到今天已经完成的功能包括:
1.四种基本路由:顺序、选择、并行、循环
2.流程定义文件和系统配置文件的读取和解析
3.使用内存作为流程数据和案例数据存储的MemoryWorkFlowDAO的开发
4.资源模块的开发
5.并发情况下的正确性测试等
计划中的功能:
1.一个GUI的流程定义工具,这个不急,也还没想好用什么做,web还是桌面?
2.各个数据库版本的WorkFlowDAO的开发,将流程数据和案例数据保存在数据库中。
3.更多的测试和example试验。
回到资源这个概念,工作流中工作项(work item)的由资源来驱动的,这个资源(resource)可能是用户、角色、定时时间或者某个事件消息。在标准petri网中,工作项对应于transition(变迁),变迁都是自动的,不需要所谓资源来驱动,显然,这与工作流系统不同。具体到insect workflow(我取的名字,小巧之意),每个transition都有一个resource,用于驱动自身的firing,所有的resource都实现Resource接口:
Transtion类的fire方法有三个操作组成:从输入库所移走token,往输出库所放入token,回调handler:
而TimerResource就需要做特殊处理了,比如我们要达到这样的效果:节点1状态已经处于就绪,可以被触发,可我们希望在就绪后延迟半分钟再触发,或者在晚上10点触发等等。这样的定时需求很常见,我采用了jdk5引入的ScheduledExecutorService来处理。系统中启动这样一个线程池,每个类似上面的请求都提交给这个线程池来处理,那么TimerResource就需要进行相应的修改:
文章转自庄周梦蝶 ,原文发布时间2007-10-13
1.四种基本路由:顺序、选择、并行、循环
2.流程定义文件和系统配置文件的读取和解析
3.使用内存作为流程数据和案例数据存储的MemoryWorkFlowDAO的开发
4.资源模块的开发
5.并发情况下的正确性测试等
计划中的功能:
1.一个GUI的流程定义工具,这个不急,也还没想好用什么做,web还是桌面?
2.各个数据库版本的WorkFlowDAO的开发,将流程数据和案例数据保存在数据库中。
3.更多的测试和example试验。
回到资源这个概念,工作流中工作项(work item)的由资源来驱动的,这个资源(resource)可能是用户、角色、定时时间或者某个事件消息。在标准petri网中,工作项对应于transition(变迁),变迁都是自动的,不需要所谓资源来驱动,显然,这与工作流系统不同。具体到insect workflow(我取的名字,小巧之意),每个transition都有一个resource,用于驱动自身的firing,所有的resource都实现Resource接口:
public interface Resource extends Serializable {
public void start(Transition transition, Token token, Object args);
public ResourceType getType();
public long getId();
}
每个资源都有一个类型,以及这个类型中独一无二的id,start方法用于驱动transtion的firing。一般情况下,你不需要实现这个接口,只要继承这个接口的抽象实现类AbstractResource,AbstractResource的start方法默认实现是首先调用模板方法doAction(稍后解释),然后检查触发条件,如果通过就直接调用transition的fire方法:public void start(Transition transition, Token token, Object args);
public ResourceType getType();
public long getId();
}
public abstract class AbstractResource implements Resource {
public void start(Transition transition, Token token, Object args) {
doAction(transition, token, args);
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition没有满足触发条件");
transition.fire(token, args);
}
public abstract void doAction(Transition transition, Token token,
Object
args) ;
}
public void start(Transition transition, Token token, Object args) {
doAction(transition, token, args);
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition没有满足触发条件");
transition.fire(token, args);
}
public abstract void doAction(Transition transition, Token token,
Object
![](http://www.blogjava.net/Images/dot.gif)
}
Transtion类的fire方法有三个操作组成:从输入库所移走token,往输出库所放入token,回调handler:
public void fire(Token token, Object
args) {
removeTokenFromInputs(token);
addTokenToOutputs(token);
invokeHandler(token, args);
}
那么具体的资源显然要实现AbstractResource中的doAction抽象方法,系统内置了五种资源:自动资源(AutoResource)、用户(User)、用户组(Group)、定时器(TimerResource)和事件监听器(ObserverResource)。显然,AutoResource、User和Group的doAction方法不需要做任何事情:![](http://www.blogjava.net/Images/dot.gif)
removeTokenFromInputs(token);
addTokenToOutputs(token);
invokeHandler(token, args);
}
public class User extends AbstractResource {
protected Group group;
![](http://www.blogjava.net/Images/dot.gif)
@Override
public void doAction(Transition transition, Token token, Object
arg){
}
}
protected Group group;
![](http://www.blogjava.net/Images/dot.gif)
![](http://www.blogjava.net/Images/dot.gif)
@Override
public void doAction(Transition transition, Token token, Object
![](http://www.blogjava.net/Images/dot.gif)
}
}
而TimerResource就需要做特殊处理了,比如我们要达到这样的效果:节点1状态已经处于就绪,可以被触发,可我们希望在就绪后延迟半分钟再触发,或者在晚上10点触发等等。这样的定时需求很常见,我采用了jdk5引入的ScheduledExecutorService来处理。系统中启动这样一个线程池,每个类似上面的请求都提交给这个线程池来处理,那么TimerResource就需要进行相应的修改:
public abstract class TimerResource extends AbstractResource {
protected int pool_size;
protected static ScheduledExecutorService scheduledExecutorService;
@Override
public long getId() {
// TODO Auto-generated method stub
return Common.TIMER_RESOURCE_ID;
}
public TimerResource() {
this.pool_size = 5;
scheduledExecutorService = Executors.newScheduledThreadPool(pool_size);
}
public static void shutdownPool() {
if (scheduledExecutorService != null)
scheduledExecutorService.shutdown();
}
public final void start(Transition transition, Token token, Object
args)
throws InterruptedException {
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition没有满足触发条件");
transition.removeTokenFromInputs(token);
doAction(transition, token, args);
}
protected class ChangeRunner implements Runnable {
private Transition transition;
private Token token;
private Object[] args;
public ChangeRunner(Transition transition, Token token, Object
args) {
this.transition = transition;
this.token = token;
this.args = args;
}
public void run() {
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition没有满足触发条件");
transition.addTokenToOutputs(token);
Object real_args[] = new Object[args.length - 2];
for (int i = 0; i < real_args.length; i++)
real_args[i] = args[i + 2];
transition.invokeHandler(token, real_args);
try {
// 回调
((WorkFlowAlgorithm) args[1]).enabledTraversing(token
.getWorkFlow());
((WorkFlowManager) args[0]).doAction(token.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
注意到,start方法不再是直接调用transition的fire方法,而仅仅是进行了第一步操作:移除输入库所的place防止重复提交。后两步操作都延迟到了提交给线程池的任务中,也就是代码中的ChangeRunner类中的run方法。例如TimerResource的子类DelayTimerResource用于处理延迟的触发,doAction就像这样:protected int pool_size;
protected static ScheduledExecutorService scheduledExecutorService;
@Override
public long getId() {
// TODO Auto-generated method stub
return Common.TIMER_RESOURCE_ID;
}
public TimerResource() {
this.pool_size = 5;
scheduledExecutorService = Executors.newScheduledThreadPool(pool_size);
}
public static void shutdownPool() {
if (scheduledExecutorService != null)
scheduledExecutorService.shutdown();
}
public final void start(Transition transition, Token token, Object
![](http://www.blogjava.net/Images/dot.gif)
throws InterruptedException {
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition没有满足触发条件");
transition.removeTokenFromInputs(token);
doAction(transition, token, args);
}
protected class ChangeRunner implements Runnable {
private Transition transition;
private Token token;
private Object[] args;
public ChangeRunner(Transition transition, Token token, Object
![](http://www.blogjava.net/Images/dot.gif)
this.transition = transition;
this.token = token;
this.args = args;
}
public void run() {
if (transition.getCondition() != null
&& !transition.getCondition().check(token))
throw new ConditionException(transition.getName()
+ " transition没有满足触发条件");
transition.addTokenToOutputs(token);
Object real_args[] = new Object[args.length - 2];
for (int i = 0; i < real_args.length; i++)
real_args[i] = args[i + 2];
transition.invokeHandler(token, real_args);
try {
// 回调
((WorkFlowAlgorithm) args[1]).enabledTraversing(token
.getWorkFlow());
((WorkFlowManager) args[0]).doAction(token.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public class DelayTimerResource extends TimerResource {
![](http://www.blogjava.net/Images/dot.gif)
![](http://www.blogjava.net/Images/dot.gif)
@Override
public void doAction(Transition transition, Token token, Object
args){
scheduledExecutorService.schedule(new ChangeRunner(transition, token,
args), this.delay, this.timeUnit);
}
}
延迟的时间,时间单位这些信息都可以在流程定义文件中设置。事件监听器资源与此类似,ObserverResource实现了java.util.Observer接口,往输出库所放入token和回调handler两步操作被放在了update方法中提供给Subject回调。![](http://www.blogjava.net/Images/dot.gif)
![](http://www.blogjava.net/Images/dot.gif)
@Override
public void doAction(Transition transition, Token token, Object
![](http://www.blogjava.net/Images/dot.gif)
scheduledExecutorService.schedule(new ChangeRunner(transition, token,
args), this.delay, this.timeUnit);
}
}
文章转自庄周梦蝶 ,原文发布时间2007-10-13
相关文章
- Linux集群和自动化维2.6.4 开发类脚本
- Linux集群和自动化维2.6.5 自动化类脚本
- 深入理解Spark:核心思想与源码分析. 3.1 SparkContext概述
- Linux集群和自动化维2.7 小结
- Ceph分布式存储实战 1.1 Ceph概述
- 深入理解Spark:核心思想与源码分析. 3.2 创建执行环境SparkEnv
- 深入理解Spark:核心思想与源码分析. 3.3 创建metadataCleaner
- Linux集群和自动化维3.5 Python(x,y)介绍
- Linux集群和自动化维3.6 轻量级自动化运维工具Fabric介绍
- Linux集群和自动化维3.6.1 Fabric的安装
- 深入理解Spark:核心思想与源码分析. 3.4 SparkUI详解
- Linux集群和自动化维3.6.2 命令行入口fab命令详解
- Linux集群和自动化维3.6.3 Fabric的核心API
- 深入理解Spark:核心思想与源码分析. 3.5 Hadoop相关配置及Executor环境变量
- Linux集群和自动化维3.7.1 开发环境中的Fabric应用实例
- Linux集群和自动化维3.7.2 线上环境中的Fabric应用实例
- 深入理解Spark:核心思想与源码分析. 3.6 创建任务调度器TaskScheduler
- Linux集群和自动化维3.8 小结
- 深入理解Spark:核心思想与源码分析. 3.7 创建和启动DAGScheduler
- 深入理解Spark:核心思想与源码分析. 3.8 TaskScheduler的启动