写一个简单的工作流(三)
一个 简单 工作
2023-09-11 14:16:05 时间
上午测试了下并发情况下的表现,测试场景:一个有20个节点,包括选择、顺序、并行路由的流程,所有节点都设置为自动执行,1000个线程并发启动案例,也就是这个流程同时有1000个案例在跑,全部跑完结果差强人意,比单线程慢了接近30倍。仔细调整了算法和加锁粒度,尽管整体性能有所提高,但是多线程和单线程间执行的差距仍然没有多大变化,性能瓶颈还是在核心的调度算法上,还需要分析下。测试程序如下:
package net.rubyeye.insect.workflow.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import net.rubyeye.insect.workflow.Place;
import net.rubyeye.insect.workflow.Token;
import net.rubyeye.insect.workflow.Transition;
import net.rubyeye.insect.workflow.WorkFlow;
import net.rubyeye.insect.workflow.WorkFlowManager;
import net.rubyeye.insect.workflow.basic.BasicWorkflowManager;
import net.rubyeye.insect.workflow.comm.TransitionType;
import net.rubyeye.insect.workflow.config.DefaultConfiguration;
import junit.framework.TestCase;
public class CompositeProcessTest extends TestCase {
private WorkFlowManager wm;
WorkFlow composite;
private CyclicBarrier barrier;
private static final int total = 1000;
@Override
protected void setUp() throws Exception {
this.barrier = new CyclicBarrier(total + 1);
wm = new BasicWorkflowManager();
wm.setConfiguration(new DefaultConfiguration());
WorkFlow sequence = wm.getWorkFlow("sequence");
WorkFlow concurrency = wm.getWorkFlow("concurrency");
WorkFlow choose = wm.getWorkFlow("choose");
// 组合流程
composite = new WorkFlow();
composite.setName("composite");
composite.setId(100);
wm.saveWorkFlow(composite);
// 修改开始结束节点的输入输出库所
sequence.getEnd().setType(TransitionType.NORMAL);
sequence.getEnd().setOutputs(concurrency.getStart().getInputs());
concurrency.getEnd().setType(TransitionType.NORMAL);
concurrency.getEnd().setOutputs(choose.getStart().getInputs());
composite.setStart(sequence.getStart());
composite.setEnd(choose.getEnd());
List Transition transitions = new ArrayList Transition ();
transitions.addAll(sequence.getTransitions());
transitions.addAll(concurrency.getTransitions());
transitions.addAll(choose.getTransitions());
composite.setTransitions(transitions);
}
public void testConcurrencyCompositeProcesss() throws Exception {
for (int i = 0; i total; i++) {
new FlowThread().start();
}
barrier.await();
long start = System.currentTimeMillis();
barrier.await();
long end = System.currentTimeMillis();
System.out.println("创建" + total + "个流程并发运行完毕\n花费时间:" + (end - start)
/ 1000.0 + "秒");
for (Transition transition : composite.getTransitions()) {
System.out.println(transition.getName() + " "
+ transition.isEnable());
for (Place place : transition.getOutputs()) {
System.out.println("place " + place.getId() + " "
+ place.getTokens().size());
}
}
}
public void testCompositeProcesss() throws Exception {
long start = System.currentTimeMillis();
for (int i = 0; i total; i++) {
Token token1 = wm.startWorkFlow("composite");
token1.setAttribute("name", "dennis");
token1.setAttribute("num", 21);
wm.doAction(token1.getId());
assertTrue(token1.isFinished());
}
long end = System.currentTimeMillis();
System.out.println("创建" + total + "个流程运行完毕\n花费时间:" + (end - start)
/ 1000.0 + "秒");
}
class FlowThread extends Thread {
@Override
public void run() {
try {
barrier.await();
// wm = new BasicWorkflowManager();
Token token1 = wm.startWorkFlow("composite");
token1.setAttribute("name", "dennis");
token1.setAttribute("num", 21);
wm.doAction(token1.getId());
assertTrue(token1.isFinished());
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
文章转自庄周梦蝶 ,原文发布时间2007-10-12
浅析数据工作流Prefect Prefect 是一种新的工作流管理系统,专为现代基础设施而设计,由开源的 Prefect Core 工作流引擎提供支持。 用户只需将任务组织成流程,Prefect 负责其余的工作,可让您非常容易使用数据工作流并添加重试、日志记录、动态映射、缓存、失败通知等语义。
五分钟快速了解Airflow工作流 Airflow是一个以编程方式创作、调度和监控工作流的平台。 使用 Airflow 将工作流创作为有向无环图(DAG)任务。 Airflow 调度程序按照你指定的依赖项在一组workers上执行您的任务。同时,Airflow拥有丰富的命令行实用程序使得在DAG上进行复杂的诊断变得轻而易举。并且提供了丰富的用户界面使可视化生产中运行的工作流、监控进度和需要排查问题时变得非常容易。 当工作流被定义为代码时,它们变得更易于维护、可版本化、可测试和协作。
package net.rubyeye.insect.workflow.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import net.rubyeye.insect.workflow.Place;
import net.rubyeye.insect.workflow.Token;
import net.rubyeye.insect.workflow.Transition;
import net.rubyeye.insect.workflow.WorkFlow;
import net.rubyeye.insect.workflow.WorkFlowManager;
import net.rubyeye.insect.workflow.basic.BasicWorkflowManager;
import net.rubyeye.insect.workflow.comm.TransitionType;
import net.rubyeye.insect.workflow.config.DefaultConfiguration;
import junit.framework.TestCase;
public class CompositeProcessTest extends TestCase {
private WorkFlowManager wm;
WorkFlow composite;
private CyclicBarrier barrier;
private static final int total = 1000;
@Override
protected void setUp() throws Exception {
this.barrier = new CyclicBarrier(total + 1);
wm = new BasicWorkflowManager();
wm.setConfiguration(new DefaultConfiguration());
WorkFlow sequence = wm.getWorkFlow("sequence");
WorkFlow concurrency = wm.getWorkFlow("concurrency");
WorkFlow choose = wm.getWorkFlow("choose");
// 组合流程
composite = new WorkFlow();
composite.setName("composite");
composite.setId(100);
wm.saveWorkFlow(composite);
// 修改开始结束节点的输入输出库所
sequence.getEnd().setType(TransitionType.NORMAL);
sequence.getEnd().setOutputs(concurrency.getStart().getInputs());
concurrency.getEnd().setType(TransitionType.NORMAL);
concurrency.getEnd().setOutputs(choose.getStart().getInputs());
composite.setStart(sequence.getStart());
composite.setEnd(choose.getEnd());
List Transition transitions = new ArrayList Transition ();
transitions.addAll(sequence.getTransitions());
transitions.addAll(concurrency.getTransitions());
transitions.addAll(choose.getTransitions());
composite.setTransitions(transitions);
}
public void testConcurrencyCompositeProcesss() throws Exception {
for (int i = 0; i total; i++) {
new FlowThread().start();
}
barrier.await();
long start = System.currentTimeMillis();
barrier.await();
long end = System.currentTimeMillis();
System.out.println("创建" + total + "个流程并发运行完毕\n花费时间:" + (end - start)
/ 1000.0 + "秒");
for (Transition transition : composite.getTransitions()) {
System.out.println(transition.getName() + " "
+ transition.isEnable());
for (Place place : transition.getOutputs()) {
System.out.println("place " + place.getId() + " "
+ place.getTokens().size());
}
}
}
public void testCompositeProcesss() throws Exception {
long start = System.currentTimeMillis();
for (int i = 0; i total; i++) {
Token token1 = wm.startWorkFlow("composite");
token1.setAttribute("name", "dennis");
token1.setAttribute("num", 21);
wm.doAction(token1.getId());
assertTrue(token1.isFinished());
}
long end = System.currentTimeMillis();
System.out.println("创建" + total + "个流程运行完毕\n花费时间:" + (end - start)
/ 1000.0 + "秒");
}
class FlowThread extends Thread {
@Override
public void run() {
try {
barrier.await();
// wm = new BasicWorkflowManager();
Token token1 = wm.startWorkFlow("composite");
token1.setAttribute("name", "dennis");
token1.setAttribute("num", 21);
wm.doAction(token1.getId());
assertTrue(token1.isFinished());
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
文章转自庄周梦蝶 ,原文发布时间2007-10-12
浅析数据工作流Prefect Prefect 是一种新的工作流管理系统,专为现代基础设施而设计,由开源的 Prefect Core 工作流引擎提供支持。 用户只需将任务组织成流程,Prefect 负责其余的工作,可让您非常容易使用数据工作流并添加重试、日志记录、动态映射、缓存、失败通知等语义。
五分钟快速了解Airflow工作流 Airflow是一个以编程方式创作、调度和监控工作流的平台。 使用 Airflow 将工作流创作为有向无环图(DAG)任务。 Airflow 调度程序按照你指定的依赖项在一组workers上执行您的任务。同时,Airflow拥有丰富的命令行实用程序使得在DAG上进行复杂的诊断变得轻而易举。并且提供了丰富的用户界面使可视化生产中运行的工作流、监控进度和需要排查问题时变得非常容易。 当工作流被定义为代码时,它们变得更易于维护、可版本化、可测试和协作。
相关文章
- WWDC15 Session笔记 - 30 分钟开发一个简单的 watchOS 2 app
- 写出一个程序,接受一个十六进制的数值字符串,输出该数值的十进制字符串。(多组同时输入 )
- Android问题-打开DelphiXE8与DelphiXE10新建一个空工程提示"out of memory"
- JBPM学习(一):实现一个简单的工作流例子全过程
- Python将多个excel表格合并为一个表格
- mysql 重新整理——索引优化一个简单的案例 [十一]
- 项目:用Pygame实现一个简单的垃圾分类小游戏
- SAP ALV tree的一个最简单demo
- 介绍一个功能强大的 Visual Studio Code 扩展 - Rest Client,能部分替代 Postman
- rxjs pipe和filter组合的一个实际例子的单步调试
- 创建一个Business partner reference extension field
- 使用maven的一个最简单的例子
- RAC 中一个 retain cycle 问题
- 送给她一个安稳的小窝(Python实现)
- matlab官方库没有桑基图(sankey) 那就自己写一个
- 编写一个程序实现两个一元多项式相加的运算
- 例 9.8 建立一个如图9.9所示的简单链表,它由3个学生数据的结点组成,要求输出各结点中的数据。
- 解答私信@icey�192 //2021-11-1 C++ 已知1900是鼠年,输入一个年份,输出其对应的生肖。
- 读懂“人性”你会发现:高手毁掉一个人有多简单?只需要两个手段——“期望效应”,又称“罗森塔尔效应”、“皮格马利翁效应”
- 一步一步写一个简单通用的makefile(二)
- pygame学习笔记(6)——一个超级简单的游戏
- 搭建一个简单Git服务器