zl程序教程

您现在的位置是:首页 >  后端

当前栏目

线程间通信方式(3)

线程 方式 间通信
2023-06-13 09:17:14 时间

前文了解了线程通信方式中的CountDownLatch, Condition,ReentrantLock以及CyclicBarrier,接下来我们继续了解其他的线程间通信方式。

Phaser

Phaser是JDK1.7中引入的一种功能上和CycliBarrier和CountDownLatch相似的同步工具,相对这两者而言其用法更加灵活,同时Phaser也支持重用。

在Phaser中将需要协作完成的任务分成多个阶段,每个阶段的参与者可指定,参与者可以随时注册并参与到某个阶段或者取消参与本阶段。以选修课考试为例,说明Phaser的工作逻辑,假设现有选修课3门,政治,历史,地理,各选修人数分别为20,10,10.按Phaser实现考试逻辑如下:

  • • 第一阶段考政治,总共应有9名同学参加考试,在考试开始时,8位同学开始答题,另外一位同学未到,考试中途,最后一位同学进入,开始考试,所有同学答题完成后,政治考试结束
  • • 第二阶段考历史,总共9名同学参考考试,在考试结束前,3名同学弃考,则实际参与考试有6名同学,所有同学答题完成后,历史考试结束
  • • 第三阶段考地理,总共9名同学参与考试,中途无意外,所有同学答题完成后,地理考试结束

至此选修课考试的三个阶段均完成,所以选修课考试这个任务结束,其中第一阶段中晚到参考考试的同学说的就是参与者可以随时注册并参与到某个阶段,第二阶段中弃考的同学说的就是参与者可以随时取消参与本阶段,当所有参与本阶段的参与者均取消,则意味着该阶段完成。

在Phaser中,针对一个阶段而言,每一个参与者都被称为一个party,可以通过构造函数指定参与者数量,也可以通过register使parties(party的总和)自增,当当前阶段的所有参与者等于parties的数量时,此时phase自增1,进入下一个阶段,回调onAdvance方法

Phaser提供的核心函数如下所示:

函数名称

描述

备注

register()

注册一个party,使得parties+1

/

bulkRegister(int parties)

批量注册party,使得parties变为已有个数与传入参数之和

/

arriveAndDeregister()

当前任务已完成,使parties计数减1,不会形成阻塞

/

arriveAndAwaitAdvance()

已达到执行点,线程阻塞,等待下一阶段唤醒继续执行

/

awaitAdvance(int phase)

参数是一个已完成的阶段编号,通常以已完成任务的arrive或者arriveAndDeregister函数的返回值作为取值,如果传入参数的阶段编号和当前阶段编号相同,则在此处等待,如果不同或者Phaser已经是terminated状态,则立即返回

/

arrive()

达到当前阶段,不等待其他参与者到达

/

arriveAndAwaitAdvance

以上述政治考试为例,学习Phaser基本使用

public static void main(String[] args) {
    // 创建Phaser
    Phaser phaser = new Phaser(){
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("政治考试完成");
                    break;
                case 1:
                    System.out.println("历史考试完成");
                    break;
                case 2:
                    System.out.println("地理考试完成");
                    break;
            }
            // 如果到达某一阶段,Phaser中参与者为0,则会销毁该Phaser
            return super.onAdvance(phase, registeredParties);
        }
    };
    
    IntStream.range(1,10).forEach(number->{
        phaser.register();
        Thread student= new Thread(()->{
            System.out.println("学生"+number+"arrive advance");
            // 等待其他线程,此时block
            phaser.arriveAndAwaitAdvance();
            System.out.println("学生"+number+"政治开始答题");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("学生"+number+"政治交卷");
            // 考试完成,取消计数,参与者减1
            phaser.arriveAndDeregister();
            System.out.println("Phaser is terminated :" +phaser.isTerminated());
        });
        student.start();
    });
    System.out.println("Phaser is terminated :" +phaser.isTerminated());
}

输出如下:

1-4-5-1

从上面可以看出,Phaser中通过arriveAndAwaitAdvance阻塞当前线程,当所有线程到达阻塞栅栏时,唤醒等待线程继续执行,进而达到线程间同步协作。

awaitAdvance

有时候,当Phaser 在当前阶段结束时,我们需要兜底做一些策略,比如说资源的释放,状态的检查上报等,此时就需要用到awaitAdvance,awaitAdvance接受一个阶段编号,如果当前阶段编号和传入的相等,则会进入等待状态,等到所有参与者都到达该阶段栅栏时,被唤醒。实例代码如下:

public static class ThreadA implements Runnable {
    private Phaser phaser;

    public ThreadA(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " start ");


        phaser.arriveAndAwaitAdvance();

        System.out.println(Thread.currentThread().getName() + " end " );
    }
}

public static class ThreadB implements Runnable {
    private Phaser phaser;

    public ThreadB(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " start " );

        phaser.arriveAndAwaitAdvance();

        System.out.println(Thread.currentThread().getName() + " end ");
    }
}

public static class ThreadC implements Runnable {
    private Phaser phaser;

    public ThreadC(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
            System.out.println(Thread.currentThread().getName() + " start ");
            System.out.println(Thread.currentThread().getName() + " phaser.getPhase()=" + phaser.getPhase());
            phaser.awaitAdvance(0);
            System.out.println(Thread.currentThread().getName() + " end ");
    }
}

public static class ThreadD implements Runnable {
    private Phaser phaser;

    public ThreadD(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " begin sleep");

            Thread.sleep(5000);

            System.out.println(Thread.currentThread().getName() + " sleep completed ");
            phaser.arriveAndAwaitAdvance();

            System.out.println(Thread.currentThread().getName() + " end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) {
    // 声明Phaser
    Phaser phaser = new Phaser(3) {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            System.out.println("Phaser arrived at :"+phase);
            return super.onAdvance(phase, registeredParties);
        }
    };

    Thread t1 = new Thread(new ThreadA(phaser));
    Thread t2 = new Thread(new ThreadB(phaser));
    Thread t3 = new Thread(new ThreadC(phaser));
    Thread t4 = new Thread(new ThreadD(phaser));

    t1.setName("ThreadA");
    t2.setName("ThreadB");
    t3.setName("ThreadC");
    t4.setName("ThreadD");

    t1.start();
    t2.start();
    t3.start();
    t4.start();
}

如上代码所示,声明Phaser有三个参与者ThreadA,ThreadB,ThreadD,在三个参与者都执行到arriveAndAwaitAdvance之前,ThreadC 阻塞等待,当三个参与者都执行到arriveAndAwaitAdvance后,回调onAdvance方法,此时被阻塞的参与者被唤醒执行,之后ThreadC被唤醒继续执行,运行结果如下:

1-4-5-2

Exchanger

Exchanger用于两个线程之间的通信,无论哪个线程先调用Exchanger,都会等待另外一个线程调用时进行数据交换,示例代码如下:

private static Exchanger<String> exchanger = new Exchanger<>();

public static void main(String[] args) {
    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName()+" sleep start");
            Thread.sleep(10000);
            System.out.println(Thread.currentThread().getName()+" sleep end");
            System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
            String aa = exchanger.exchange("data from Thread1");
            System.out.println(Thread.currentThread().getName() + "   "+aa);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "Thread1").start();

    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
            String bb = exchanger.exchange("data from Thread2");
            System.out.println(Thread.currentThread().getName() + "   "+bb);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "Thread2").start();
}

运行输出如下:

1-4-5-3

总结

结合前文,我们一共学习了种线程间通信方式,主要有:

  1. 1. Object.wait/Object.notify/Object.notifyAll + synchronized
  2. 2. Semaphore(信号量)
  3. 3. CountDownLatch
  4. 4. CyclicBarrier
  5. 5. Condition+ReentrantLock
  6. 6. Phaser
  7. 7. Exchanger

大家日常开发中可灵活使用,针对各通信方式比较见下表:

通信方式

应用场景

是否可重用

子任务异常处理

备注

Object.wait/Object.notify/Object.notifyAll + synchronized

大多数线程通信场景

依赖开发者维护,在finally块中完成释放,避免死锁

/

Semaphore(信号量)

通知唤醒类线程间通信场景

依赖开发者维护,在finally块中释放信号量,避免死锁

/

CountDownLatch

串行多线程运行场景

不加处理的话,子任务发生异常导致退出,则所有等待的线程都会一致等待,直到超时时间来临

/

CyclicBarrier

线程聚合类通信场景

不加处理的话,如果在所有线程都到达屏障陷入阻塞前,如果有线程发生异常导致未到达栅栏提前退出,则所有等待在栅栏都会以BrokenBarrierException或InterruptedException异常退出

/

Condition+ReentrantLock

大多数线程通信场景

依赖开发者维护,在finally块中完成释放,避免死锁

/

Phaser

适用CountDownLatch及CyclicBarrier组合场景

依赖开发者维护,在finally块中取消参与者,避免死锁

/

Exchanger

线程间数据交换场景

依赖开发者维护,确保两个线程状态正常,并行运行

/