批量处理工具类
2023-04-18 14:44:18 时间
业务开发中,时常会批量执行任务,例如批量同时调用4个http接口或者rpc接口,这类业务代码执行具有通用性,为了提高开发效率、可复用性、可扩展性,简化代码,抽象出通用的工具类,方便开发同学使用。使用者只关心入参、具体任务执行、以及任务执行结果、线程池,并不关心批量处理的过程。
任务处理流程图
代码
public class BatchQuery {
/**
* 并行且异步处理结果
*
* @param tasks 任务列表
* @param p 参数
* @param handle 具体业务处理
* @param complete 完成处理逻辑
* @param executor 线程池
* @param <T>
* @param <P>
* @param <R>
*/
public static <T, P, R> void asyncQueryHandleAsync(List<T> tasks, P p, Function<T, P, R> handle,
BiConsumer<R, Throwable> complete, Executor executor) {
Objects.requireNonNull(p);
Optional.ofNullable(tasks).ifPresent(task -> {
val cfs = task.stream()
.map(t ->
CompletableFuture.supplyAsync(
() -> handle.apply(t, p), executor).whenCompleteAsync(complete)
).toArray(CompletableFuture[]::new);
//等待总任务完成
CompletableFuture.allOf(cfs).join();
});
}
/**
* 并行且同步处理结果
*
* @param tasks 任务列表
* @param p 参数
* @param handle 具体业务处理
* @param complete 完成处理逻辑
* @param executor 线程池
* @param <T>
* @param <P>
* @param <R>
*/
public static <T, P, R> void asyncQueryHandleSync(List<T> tasks, P p, Function<T, P, R> handle,
BiConsumer<R, Throwable> complete, Executor executor) {
Objects.requireNonNull(p);
Optional.ofNullable(tasks).ifPresent(task -> {
val cfs = task.stream()
.map(t ->
CompletableFuture.supplyAsync(
() -> handle.apply(t, p), executor).whenComplete(complete)
).toArray(CompletableFuture[]::new);
//等待总任务完成
CompletableFuture.allOf(cfs).join();
});
}
}
@FunctionalInterface
public interface Function<T, P, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @param p
* @return the function result
*/
R apply(T t, P p);
}
使用示例
List<OrderDTO> orderDTOS = Lists.newArrayList();
try {
BatchQuery.asyncQueryHandleSync(BIZ_TYPE_LIST, onTheWayOrderSwitchProcessCtx.getOnTheWayOrderSwitchParam(),
(bizTypeEnum, orderSwitchParam) ->
standardApiSupport
.getDriverOrderApiByBizType(bizTypeEnum.getValue(), CONFIG)
.queryOrderList
(
OrderConverter
.buildDriverOrderReqDTO(currentStartTime, currentEndTime, bizTypeEnum,
orderSwitchParam)
), (r, ex) -> {
if (Objects.isNull(ex) && JudgeIsSuccessUtil.judgeDataNotNull(r)) {
orderDTOS.addAll(r.getDataList());
}
}, RPC_SEARCH_EXECUTOR_SERVICE);
} catch (Exception e) {
throw new ApiException();
}
使用者需要传入具体的任务,指定线程池,以及共同参数P,P的存在具有合理性,往往任务会使用共同的参数,因此自定义了函数式接口Function,以及具体的处理handle,handle里可以做差异化处理,执行结果会在complete中拿到,做具体的业务处理,可以大大减少重复代码。
相关文章
- 【技术种草】cdn+轻量服务器+hugo=让博客“云原生”一下
- CLB运维&运营最佳实践 ---访问日志大洞察
- vnc方式登陆服务器
- 轻松学排序算法:眼睛直观感受几种常用排序算法
- 十二个经典的大数据项目
- 为什么使用 CDN 内容分发网络?
- 大数据——大数据默认端口号列表
- Weld 1.1.5.Final,JSR-299 的框架
- JavaFX 2012:彻底开源
- 提升as3程序性能的十大要点
- 通过凸面几何学进行独立于边际的在线多类学习
- 利用行动影响的规律性和部分已知的模型进行离线强化学习
- ModelLight:基于模型的交通信号控制的元强化学习
- 浅谈Visual Source Safe项目分支
- 基于先验知识的递归卡尔曼滤波的代理人联合状态和输入估计
- 结合网络结构和非线性恢复来提高声誉评估的性能
- 最佳实践丨云开发CloudBase多环境管理实践
- TimeVAE:用于生成多变量时间序列的变异自动编码器
- 具有线性阈值激活的神经网络:结构和算法
- 内网渗透之横向移动 -- 从域外向域内进行密码喷洒攻击