zl程序教程

您现在的位置是:首页 >  后端

当前栏目

async4j 普通用法、整合spring用法

Spring 用法 整合 普通
2023-09-11 14:18:43 时间

1.普通用法

asyn4j 是一个java异步方法调用框架,基于消费者与生产者模式。

async4j就是基于Executors线程池和反射机制实现的。

包括了异步方法执行,异步回调执行,异步工作缓存模块.支持Spring.
让我们写异步方法不再写很多的相关多线程代码。用asyn4j轻松搞定异步方法调用.提高程序的响应能力.

主要接口:AsynService,具体方法如下:

              @Override
			public void setWorkQueueFullHandler(WorkQueueFullHandler arg0) {}
			
			@Override
			public void setServiceHandler(AsynServiceHandler arg0) {}
	
			@Override
			public void setErrorAsynWorkHandler(ErrorAsynWorkHandler arg0) {}
			
			@Override
			public void init() {}
			
			@Override
			public Map<String, Long> getRunStatMap() {
				return null;
			}
			
			@Override
			public String getRunStatInfo() {
				return null;
			}
			
			@Override
			public void close(long arg0) {}
			
			@Override
			public void close() {}
			
			@Override
			public void addWorkWithSpring(String arg0, String arg1, Object[] arg2, AsynCallBack arg3, WorkWeight arg4) {}
			
			@Override
			public void addWork(Object arg0, String arg1, Object[] arg2, AsynCallBack arg3, WorkWeight arg4, boolean arg5) {}
			
			@Override
			public void addWork(Object arg0, String arg1, Object[] arg2, AsynCallBack arg3, WorkWeight arg4) {}
			
			@Override
			public void addWork(Object arg0, String arg1, Object[] arg2, AsynCallBack arg3) {}
			
			@Override
			public void addWork(Object arg0, String arg1, Object[] arg2) {}
			
			@Override
			public void addWork(Object arg0, String arg1) {}
			
			@Override
			public void addAsynWork(AsynWork arg0) {} 

主要实现类:AsynServiceImpl。获得实例化对象的方法有四个:

 方法1:  AsynService asynService=AsynServiceImpl.getService();
 方法2:  AsynService asynService=AsynServiceImpl.getService(maxCacheWork, addWorkWaitTime, workThreadNum, callBackThreadNum, closeServiceWaitTime);
方法3:  AsynService asynService=(AsynService) AsynServiceImpl.getCallBackExecutor(); 方法4:  AsynService asynService=(AsynService) AsynServiceImpl.getWorkExecutor(); 

  getService参数说明:

maxCacheWork:最大工作队列缓存工作数 – 300(默认值)
addWorkWaitTime:当工作队列满时添加工作等待时间-- Long.MAX_VALUE(默认值)
workThreadNum:异步工作执行线程池大小 ---- CPU核数/2 +1(默认值)
callBackThreadNum:回调执行线程池大小 --- CPU核数/2(默认值)
closeServiceWaitTime:服务关闭等待时间 ---- 60000s(默认值)
普通方法实现(相对于整合spring而言)
jar包:async4j-1.3.jar commoms-logging-1.2.jar
下载地址:http://download.csdn.net/detail/u012373717/9673060
async4j使用了commoms-logging,需自行导入
否则
方式1:异步工作内容在主线程类中
package Async4jDemo1;

import com.googlecode.asyn4j.core.callback.AsynCallBack;
import com.googlecode.asyn4j.service.AsynService;
import com.googlecode.asyn4j.service.AsynServiceImpl;

public class async4jDemo {

	public static void main(String[] args) throws InterruptedException {
		async4jDemo asy=new async4jDemo();
		asy.doWork();
	}
	
	public void doWork() throws InterruptedException {
		// 初始化异步工作服务
		AsynService asynService = AsynServiceImpl.getService();
		// 启动服务
		asynService.init();
		// 异步回调对象
		AsynCallBack back = new CallBackDemo1();
		String string="this is a test";
		for(int i=0;i<3;i++) {
			asynService.addWork(this, "test", new Object[] {string+"::"+i},back);//通过添加i,区别哪项异步工作进行了回调
			Thread.sleep(1000);
		}
	}

	public int test(String string){
		String[] split = string.split("::");
		System.out.println("异步工作中: "+ split[0]);
		return Integer.valueOf(split[1]);
	}
}

class CallBackDemo1 extends AsynCallBack {
	
	private static final long serialVersionUID = 1L;

	@Override
	public void doNotify() {
		 //输出异步方法调用返回结果   
        System.out.println("CallBack's result: "+this.methodResult);  
	}

}

 方法2:异步工作内容在单独类中

package Async4jDemo2;

import com.googlecode.asyn4j.core.callback.AsynCallBack;
import com.googlecode.asyn4j.service.AsynService;
import com.googlecode.asyn4j.service.AsynServiceImpl;

public class Async4jDemo {

	public static void main(String[] args) throws InterruptedException {
		Async4jDemo asy = new Async4jDemo();
		asy.doWork();
	}

	public void doWork() throws InterruptedException {
		// 初始化异步工作服务
		AsynService asynService = AsynServiceImpl.getService();
		// 启动服务
		asynService.init();

		TargetServiceDemo targetService = new TargetServiceDemo();
		String str = "this is a test";
		for (int i = 0; i < 3; i++) {
			asynService.addWork(targetService, "test", new Object[] { str + "::" + i }, targetService.back);
			Thread.sleep(1000);
		}
	}
}

class CallBackDemo extends AsynCallBack {
	private static final long serialVersionUID = 1L;

	@Override
	public void doNotify() {
		// 输出异步方法调用返回结果
		System.out.println("CallBack: " + this.methodResult);
	}

}

  

package Async4jDemo2;
import com.googlecode.asyn4j.core.callback.AsynCallBack;

public class TargetServiceDemo {
	// 异步回调对象
	AsynCallBack back = new CallBackDemo();

	public int test(String string) {
		String[] split = string.split("::");
		System.out.println("异步工作中: " + split[0]);
		return Integer.valueOf(split[1]);
	}

}

  

 

2.整合spring用法

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:mongo="http://www.springframework.org/schema/data/mongo"
	xsi:schemaLocation="http://www.springframework.org/schema/context
          http://www.springframework.org/schema/context/spring-context-3.0.xsd
          http://www.springframework.org/schema/data/mongo
          http://www.springframework.org/schema/data/mongo/spring-mongo-1.0.xsd
          http://www.springframework.org/schema/beans
          http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">


	<bean id="asyncService" class="com.googlecode.asyn4j.spring.AsynServiceFactoryBean">
		<!--设置自定义相关参数 -->
		<!--(maxCacheWork)最大工作队列缓存工作数 – 300(默认值) -->
		<!--(addWorkWaitTime)当工作队列满时添加工作等待时间- Long.MAX_VALUE(默认值) -->
		<!--(workThreadNum)异步工作执行线程池大小 - CPU核数/2 +1(默认值) -->
		<!--(callBackThreadNum)回调执行线程池大小 - CPU核数/2(默认值) -->
		<!--(closeServiceWaitTime) 服务关闭等待时间 - 60000s(默认值) -->
		<property name="maxCacheWork" value="${maxCacheWork}"></property>
		<property name="addWorkWaitTime" value="${addWorkWaitTime}"></property>
		<property name="workThreadNum" value="${workThreadNum}"></property>
		<property name="callbackThreadNum" value="${callbackThreadNum}"></property>
		<property name="closeServiceWaitTime" value="${closeServiceWaitTime}"></property>
		<!--添加相关处理器 -->
		<property name="errorAsynWorkHandler">
			<bean
				class="com.googlecode.asyn4j.core.handler.DefaultErrorAsynWorkHandler" />
		</property>
		<property name="workQueueFullHandler">
			<bean class="com.googlecode.asyn4j.core.handler.CacheAsynWorkHandler" />
		</property>
	</bean>

	<!-- spring的属性加载器,加载properties文件中的属性 -->
	<bean id="propertyConfigurer"
		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="location">
			<value>conf/sys_conf.properties</value>
		</property>
		<property name="fileEncoding" value="utf-8" />
	</bean>

</beans>

  

package Aysnc4jSpringDemo;

import java.io.FileNotFoundException;
import java.io.IOException;


import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.googlecode.asyn4j.core.callback.AsynCallBack;
import com.googlecode.asyn4j.service.AsynService;


public class Async4jSpringDemo {

	public static void main(String[] args) throws FileNotFoundException, IOException {

		ApplicationContext ac = new ClassPathXmlApplicationContext("conf/applicationContext.xml");

		AsynService asynService = (AsynService) ac.getBean("asyncService");

		Async4jSpringDemo async4jSpringDemo = new Async4jSpringDemo();
		async4jSpringDemo.DoWOrk(asynService);
	}

	public void DoWOrk(AsynService asynService) {
		String str = "this is a test!";
		for (int i = 0; i < 3; i++) {
			AsynCallBack callBack = new CallBack();
			asynService.addWork(this, "test", new Object[] { str + "::" + i }, callBack);
		}
	}

	public int test(String string) {
		String[] split = string.split("::");
		System.out.println("异步工作中: " + split[0]);
		return Integer.valueOf(split[1]);
	}
}

class CallBack extends AsynCallBack {

	private static final long serialVersionUID = 1L;

	@Override
	public void doNotify() {

		// 输出异步方法调用返回结果
		System.out.println("CallBack's result: " + this.methodResult);

	}

}

  

 

 3.相关处理器

       

 asynService.setErrorAsynWorkHandler(arg0);

 asynService.setServiceHandler(arg0);

 asynService.setWorkQueueFullHandler(arg0);

3.1 异步工作缓冲器--(当工作队列工作数超过maxCacheWork时由处理器处理)

anycService.setWorkQueueFullHandler(new CacheAsynWorkHandler(100));//顺序不能变 
anycService.init();

系统有一个默认的处理器 CacheAsynWorkHandler 。

建议实现自己的处理器,需实现 addAsynWork,process 方法 ;自定义处理器继承 WorkQueueFullHandler 抽象类。process建议启动一个守护线程监听。

当工作队列中的工作超过300个时,异步工作将由CacheAsynWorkHandler处理;

3.2服务启动和关闭处理器 --(当服务启动和关闭调用)

anycService.setCloseHander(new FileAsynServiceHandler("c:/asyn4j.data"));

设置c:/asyn4j.data为持久化文件 FileAsynServiceHandler 是 AsynServiceHandler 的一个例子将任务持久化到文件,当系统启动时加载文件内容到内存,关闭时将未执行的任务持久化到文件。

大家可以参考源码将任务持久化到别外的地方(memcached)。自定义处理器继承 AsynServiceHandler 抽象类

3.3异步工作执行异常处理器 --(当工作执行出现异常时处理器)

anycService.setErrorAsynWorkHandler(new DefaultErrorAsynWorkHandler());

自定义处理器继承 ErrorAsynWorkHandler 抽象类

3.4所有处理器为可选配置,建议根据自己的业务继承相关的类实现自己的处理器

3.5异步工作优级

分成三个等级WorkWeight.LOW,WorkWeight.MIDDLE, WorkWeight.HIGH 默认优先级为WorkWeight.MIDDLE 。

 4.添加异步工作API

普通方法:

asynService.addAsynWork(AsyncWork arg0); asynService.addWork(Object, String); asynService.addWork(Object, String, Object[]); asynService.addWork(Object, String, Object[], AsynCallBack); asynService.addWork(Object, String, Object[],AsynCallBack, WorkWeight ); asynService.addWork(Object, String, Object[],AsynCallBack, WorkWeight,
boolean );

其中:

 * @param Object -- 目标对象(可以是 Class,Object,String(spring))

 * @param String -- 目标方法

 * @param object[]-- 目标方法参数

 * @param AsynCallBack --回调对象

 * @param WorkWeight -- 工作权重

 * @param boolean --  如果目标对象为class,实例化后是否缓存

整合spring方法:

asynService.addWorkWithSpring(String, String, Object[], AsynCallBack, WorkWeight);
其中:

 * @param String-- 目标对象BeanName

 * @param String -- 目标方法

 * @param object[] -- 目标方法参数

 * @param AsynCallBack --回调对象

 * @param WorkWeight -- 工作权重

获取运行状态信息
 public Map getRunStatMap();
Map key 说明:
total: 累计接收异步工作数
execute: 执行异步工作数
callback: 执行回调数 
关闭服务等待 waitTime 秒
public
void close(long waitTime);

5.注意事项

 添加异步工作时,对于目标方法,无法区分方法同名的并且参数是继承关系的方法。

 

 

项目地址:http://asyn4j.googlecode.com 
源码SVN : http://asyn4j.googlecode.com/svn/branches/asyn4j 
WIKI: http://code.google.com/p/asyn4j/wiki/user_guide