zl程序教程

您现在的位置是:首页 >  后端

当前栏目

【Java】线程通信

2023-06-13 09:15:29 时间

1. 线程通信的理解

当我们需要多个线程来共同完成同一个任务,并且我们希望他们有规律的执行,那么多线程之间久需要一些通信机制。可以协调他们的工作,以此实现多线程之间共同操作同一份数据

比如:线程A用来生产包子的,线程B用来吃包子的,包子可以理解为同一资源,线程A与线程B处理的动作,一个是生产,一个是消费,此时B线程必须等到A线程完成后才能执行,那么线程A与线程B之间就需要线程通信,即—— 等待唤醒机制

2. 涉及的3个方法

方法

描述

wait()

使线程进行等待状态,并释放锁

notify()

唤醒正待等待的线程

notifyAll()

唤醒所有正在等待的锁

注意点:

  • 这3个方法的使用,必须是在synchronized同步代码块或同步方法中,Lock的线程通信方法如下:
    • private Lock lock = new ReentrantLock(); public Condition condition = lock.newCondition();
    • condition.await():使线程进行等待状态,并释放锁
    • condition.signal():唤醒正待等待的线程
    • condition.signalAll():唤醒所有正在等待的锁
  • 此3个方法的调用者,必须是同步监视器,否则会报IllegalMonitorStateException。
  • 此3个方法是Object类下的,因而可以在任意对象中使用。

3. wait() 和 sleep() 的区别?

相同点: 一旦执行,都会进入阻塞状态。

不同点:

  • 声明位置:
    • wait():声明在Object类中
    • sleep():声明在Thread类中,静态的
  • 使用场景
    • wait():只能用在 synchronized 的同步方法或同步代码块中
    • sleep():可以在任何地方使用
  • 在synchronized中使用
    • wait():一旦执行,会释放锁
    • sleep():一旦执行,不会释放锁
  • 结束阻塞方式:
    • wait():到达时间自动结束阻塞,或 通过notify()唤醒结束阻塞
    • sleep():到达时间自动结束阻塞

4.生产者&消费者DEMO

等待唤醒机制可以解决经典的“生产者与消费者”的问题。生产者与消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个(多个)共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。

生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

举例:

生产者(Productor)将产品交给店员(Clerk),而消费者(Customer)从店员处取走产品,店员一次只能持有固定数量的产品(比如:20),如果生产者试图生产更多的产品,店员会叫生产者停一下,如果店中有空位放产品了再通知生产者继续生产;如果店中没有产品了,店员会告诉消费者等一下,如果店中有产品了再通知消费者来取走产品。

类似的场景,比如厨师和服务员等。

生产者与消费者问题中其实隐含了两个问题:

  • 线程安全问题:因为生产者与消费者共享数据缓冲区,产生安全问题。不过这个问题可以使用同步解决。
  • 线程的协调工作问题:
    • 要解决该问题,就必须让生产者线程在缓冲区满时等待(wait),暂停进入阻塞状态,等到下次消费者消耗了缓冲区中的数据的时候,通知(notify)正在等待的线程恢复到就绪状态,重新开始往缓冲区添加数据。同样,也可以让消费者线程在缓冲区空时进入等待(wait),暂停进入阻塞状态,等到生产者往缓冲区添加数据之后,再通知(notify)正在等待的线程恢复到就绪状态。通过这样的通信机制来解决此类问题。

代码实现:

package priv.kuki.thread;

/**
 * @author Tang-J-L
 * 生产者&消费者
 * 生产者(Producer)将产品交给店员(Clerk),而消费者(Customer)从店员处取走产品,店员一次只能持有固定数量的产品(比如:20),
 * 如果生产者试图生产更多的产品,店员会叫生产者停一下;如果店中还有空位放产品了再通知生产者继续生产,如果店中没有产品了,店员会告诉消费者等一下,
 * 如果店中有产品了再通知消费者来取走产品。
 *
 * 分析:
 *  1.两个线程:生产者、消费者
 *  2.共享数据:产品数量
 *  3.线程之间存在通信
 *  4.存在线程安全问题,需要解决(因为有共享数据)
 *
 *  TODO:
 *      1.创建Clear、Producer、Customer三个类
 *      2.产品数量由店员维护(提供增加,减少产品数量的方法)
 *      3.
 *
 * @Date 2023/2/28 周二 14:59
 */

class Clerk{

    /**
     * 维护商品数量
     */
    private int productNum = 0;

    /**
     * 商品最大数量,避免魔法值
     */
    private static final int MAX_NUMBER = 20;

    /**
     * 商品最低数量,避免魔法值
     */
    private static final int MIN_NUMBER = 0;

    /**
     * 商品增加
     */
    public synchronized void increase(){

        if (productNum >= MAX_NUMBER) {

            try{
                // 等待
                wait();
            } catch(InterruptedException e){
                e.printStackTrace();
            }

        }else {

            productNum++;
            System.out.println(Thread.currentThread().getName()+"生产了第"+productNum+"个商品");

            // 唤醒消费者
            this.notifyAll();
        }
    }

    /**
     * 商品数量减少
     */
    public synchronized void reduce(){

        if (productNum <= MIN_NUMBER) {

            try{
                // 等待
                wait();
            } catch(InterruptedException e){
                e.printStackTrace();
            }

        }else {

            System.out.println(Thread.currentThread().getName()+"消费了第"+productNum+"个商品");
            productNum--;

            // 唤醒生产者
            this.notifyAll();
        }
    }
}

/**
 * 生产者
 */
class Producer extends Thread{

    private Clerk clerk;

    Producer(Clerk clerk){
        this.clerk = clerk;
    }

    @Override
    public void run(){

        while(true){

            System.out.println("生产者开始生产商品...");

            try{
                Thread.sleep(50);
            } catch(InterruptedException e){
                throw new RuntimeException(e);
            }

            clerk.increase();

        }
    }
}

/**
 * 消费者
 */
class Customer extends Thread{

    private Clerk clerk;

    Customer(Clerk clerk){
        this.clerk = clerk;
    }

    @Override
    public synchronized void run(){

        while(true){
            System.out.println("消费者开始消费商品...");

            try{
                Thread.sleep(1000);
            } catch(InterruptedException e){
                throw new RuntimeException(e);
            }

            clerk.reduce();
        }

    }
}

/**
 * 测试类
 * @author Tang-J-L
 */
public class ProducerAndCustomerTest{

    public static void main(String[] args){

		// 共享数据,通过同一个数据创建线程,实现数据共享
        Clerk clerk = new Clerk();

        Producer producer = new Producer(clerk);
        Customer customer1 = new Customer(clerk);
        Customer customer2 = new Customer(clerk);

        producer.setName("生产者");
        customer1.setName("消费者1");
        customer2.setName("消费者2");

        producer.start();
        customer1.start();
        customer2.start();

    }

}