主线程等待几个子线程执行完成方案
2023-09-11 14:20:55 时间
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(3);
long start = System.currentTimeMillis();
for (int i = 0; i 3; i++) {
new Thread(new SubRunnable(i, latch)).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() - start);
System.out.println("Main finished");
}
static class SubRunnable implements Runnable {
private int id = -1;
private CountDownLatch latch;
SubRunnable(int id, CountDownLatch latch) {
this.id = id;
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println(String
.format("Sub Thread %d finished", id));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
}
public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(3); List Callable Void subs = new ArrayList Callable Void (); for (int i = 0; i 3; i++) { subs.add(new SubCallable(i)); } long start = System.currentTimeMillis(); try { pool.invokeAll(subs); } finally { pool.shutdown(); } System.out.println(System.currentTimeMillis() - start); System.out.println("Main finished"); } static class SubCallable implements Callable Void { private int id = -1; public SubCallable(int id) { this.id = id; } @Override public Void call() throws Exception { try { Thread.sleep(3000); System.out.println(String .format("Child Thread %d finished", id)); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
public T List Future T invokeAll(Collection ? extends Callable T tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List Future T futures = new ArrayList Future T (tasks.size()); boolean done = false; try { for (Callable T t : tasks) { RunnableFuture T f = newTaskFor(t); futures.add(f); execute(f); } for (Future T f : futures) { if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { if (!done) for (Future T f : futures) f.cancel(true); } }
* Created by shenhongxi on 2016/8/12. * 子线程与主线程是顺序执行的,各子线程之间还是异步的 */ public class JoinTest { public static void main(String[] args) throws Exception { Thread t1 = new Thread(new SubRunnable(0)); Thread t2 = new Thread(new SubRunnable(1)); Thread t3 = new Thread(new SubRunnable(2)); long start = System.currentTimeMillis(); t1.start(); t2.start(); t3.start(); t1.join(); t2.join(); t3.join(); System.out.println(System.currentTimeMillis() - start); System.out.println("Main finished"); } static class SubRunnable implements Runnable { private int id = -1; SubRunnable(int id) { this.id = id; } @Override public void run() { try { System.out.println("hi, Im id-" + id); Thread.sleep(9000); System.out.println(String .format("Sub Thread %d finished", id)); } catch (InterruptedException e) { e.printStackTrace(); } } }
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1; private int maxPoolSize = Integer.MAX_VALUE; private int keepAliveSeconds = 60; private boolean allowCoreThreadTimeOut = false; private int queueCapacity = Integer.MAX_VALUE; private ThreadPoolExecutor threadPoolExecutor; /** * Set the ThreadPoolExecutors core pool size. * Default is 1. * p b This setting can be modified at runtime, for example through JMX. /b */ public void setCorePoolSize(int corePoolSize) { synchronized (this.poolSizeMonitor) { this.corePoolSize = corePoolSize; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setCorePoolSize(corePoolSize); } } } /** * Return the ThreadPoolExecutors core pool size. */ public int getCorePoolSize() { synchronized (this.poolSizeMonitor) { return this.corePoolSize; } }
public interface ExecutorService extends Executor { T List Future T invokeAll(Collection ? extends Callable T tasks) throws InterruptedException; public interface Executor { void execute(Runnable command); public abstract class AbstractExecutorService implements ExecutorService{ public T List Future T invokeAll(Collection ? extends Callable T tasks) { // ... } public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue Runnable workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
多线程001 - 主线程等待子线程结束 在很多时候,我们期望实现这么一种功能:在主线程中启动一些子线程,等待所有子线程执行结束后,主线程再继续执行。比如:老板分配任务,众多工人开始工作,等所有工人完成工作后,老板进行检查。
CountDownLatch用队列来存放任务,主要是一个构造器和两个方法,相关代码这里不予赘述。CountDownLatch很贴合我们的要求,但没用到线程池,而且latch是只提供了计数功能然后子线程的逻辑有没有可能会在主线程逻辑之后执行??,综合考虑,我推荐下面的这种方案。
2. ExecutorService
public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(3); List Callable Void subs = new ArrayList Callable Void (); for (int i = 0; i 3; i++) { subs.add(new SubCallable(i)); } long start = System.currentTimeMillis(); try { pool.invokeAll(subs); } finally { pool.shutdown(); } System.out.println(System.currentTimeMillis() - start); System.out.println("Main finished"); } static class SubCallable implements Callable Void { private int id = -1; public SubCallable(int id) { this.id = id; } @Override public Void call() throws Exception { try { Thread.sleep(3000); System.out.println(String .format("Child Thread %d finished", id)); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
public T List Future T invokeAll(Collection ? extends Callable T tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List Future T futures = new ArrayList Future T (tasks.size()); boolean done = false; try { for (Callable T t : tasks) { RunnableFuture T f = newTaskFor(t); futures.add(f); execute(f); } for (Future T f : futures) { if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { if (!done) for (Future T f : futures) f.cancel(true); } }
* Created by shenhongxi on 2016/8/12. * 子线程与主线程是顺序执行的,各子线程之间还是异步的 */ public class JoinTest { public static void main(String[] args) throws Exception { Thread t1 = new Thread(new SubRunnable(0)); Thread t2 = new Thread(new SubRunnable(1)); Thread t3 = new Thread(new SubRunnable(2)); long start = System.currentTimeMillis(); t1.start(); t2.start(); t3.start(); t1.join(); t2.join(); t3.join(); System.out.println(System.currentTimeMillis() - start); System.out.println("Main finished"); } static class SubRunnable implements Runnable { private int id = -1; SubRunnable(int id) { this.id = id; } @Override public void run() { try { System.out.println("hi, Im id-" + id); Thread.sleep(9000); System.out.println(String .format("Sub Thread %d finished", id)); } catch (InterruptedException e) { e.printStackTrace(); } } }
最后,我们顺便提下org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1; private int maxPoolSize = Integer.MAX_VALUE; private int keepAliveSeconds = 60; private boolean allowCoreThreadTimeOut = false; private int queueCapacity = Integer.MAX_VALUE; private ThreadPoolExecutor threadPoolExecutor; /** * Set the ThreadPoolExecutors core pool size. * Default is 1. * p b This setting can be modified at runtime, for example through JMX. /b */ public void setCorePoolSize(int corePoolSize) { synchronized (this.poolSizeMonitor) { this.corePoolSize = corePoolSize; if (this.threadPoolExecutor != null) { this.threadPoolExecutor.setCorePoolSize(corePoolSize); } } } /** * Return the ThreadPoolExecutors core pool size. */ public int getCorePoolSize() { synchronized (this.poolSizeMonitor) { return this.corePoolSize; } }
看到我们熟悉的ThreadPoolExecutor之后,我们瞬间明白了一切。
另外我们脑补下几个接口/类的关系
public interface ExecutorService extends Executor { T List Future T invokeAll(Collection ? extends Callable T tasks) throws InterruptedException; public interface Executor { void execute(Runnable command); public abstract class AbstractExecutorService implements ExecutorService{ public T List Future T invokeAll(Collection ? extends Callable T tasks) { // ... } public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue Runnable workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
多线程001 - 主线程等待子线程结束 在很多时候,我们期望实现这么一种功能:在主线程中启动一些子线程,等待所有子线程执行结束后,主线程再继续执行。比如:老板分配任务,众多工人开始工作,等所有工人完成工作后,老板进行检查。
相关文章
- JVM 内部运行线程介绍
- 基本线程同步(二)同步方法
- 线程执行者(三)创建一个大小固定的线程执行者
- C#中的线程(二) 线程同步基础
- 假期(进程、线程、协程)
- ThreadPoolExecutor线程池
- java中线程存活和线程执行的问题!
- java线程--volatile实现可见性
- Redis自旋锁解决分布高并发问题:使线程异步变为同步执行
- 《android开发艺术探索》读书笔记(十一)--Android的线程和线程池
- Redis自旋锁解决分布高并发问题:使线程异步变为同步执行
- 请你简要说明一下线程的基本状态以及状态之间的关系?
- 【完整代码】使用Semaphore实现线程的交替执行打印 A1B2C3D4E5
- Atitit.swt 线程调用ui控件的方法
- 多线程之线程的执行顺序
- 线程生命周期(状态)
- 【SpringBoot系列】 - 异步线程池的使用
- 10分钟带你徒手做个Java线程池
- 深入分析3种线程池执行任务的逻辑方法
- C# 执行固定个数任务自行控制进入线程池的线程数量,多任务同时但是并发数据限定
- dubbo线程池作用于接口而不是方法
- java线程执行的优先级
- java ee wildfly spring 在线程池的线程中注入
- SpringBoot异步及线程池配置
- C# 中await前后执行线程的问题
- (01)ORB-SLAM2源码无死角解析-(60) 闭环线程→闭环矫正: CorrectLoop→地图点融合、共视关系更新
- Servlet — 线程安全问题