zl程序教程

您现在的位置是:首页 >  后端

当前栏目

线程通信的生产者消费者模式

线程模式通信 消费者 生产者
2023-06-13 09:14:28 时间

小知识:在多线程里不要用if,使用while判断,防止虚假唤醒

普通版:

package com.an.learning.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareData{ // 资源类

    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment()throws Exception{
        lock.lock();
        try {
            // 判断,多线程判断用while
            while (number != 0){
                // 等待,不能生产
                condition.await();
            }
            // 生产
            this.number++;
            System.out.println(Thread.currentThread().getName()+"\t"+number);
            condition.signalAll();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement()throws Exception{
        lock.lock();
        try {
            // 判断,多线程判断用while
            while (number == 0){
                // 等待生产
                condition.await();
            }
            // 消费
            this.number--;
            System.out.println(Thread.currentThread().getName()+"\t"+number);
            // 唤醒
            condition.signalAll();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


}

/**
 * @author Anzepeng
 * @title: ProdConsumer_TraditionDemo
 * @projectName learning
 * @description: TODO
 * @date 2020/8/14 0014上午 10:27
 *
 * 题目:一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,5次
 *
 * 1   线程  操作   资源类
 * 2   判断  干活   通知
 * 3   防止虚假唤醒
 */
public class ProdConsumer_TraditionDemo {

    public static void main(String[] args) {
        ShareData shareData = new ShareData();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"AA").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },"BB").start();
    }
}

队列版:

使用阻塞队列

package com.an.learning.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource{

    private volatile boolean FLAG = true; // 默认开启生产
    private AtomicInteger atomicInteger = new AtomicInteger();
    BlockingQueue<String> blockingQueue = null;

    public MyResource(BlockingQueue blockingQueue){
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName()); // 获取实现类名称
    }

    public void MyProd() throws Exception{
        String data = null;
        boolean retValue = true;
        while (FLAG){
            data = atomicInteger.incrementAndGet()+"";
            blockingQueue.offer(data,2L, TimeUnit.SECONDS);
            if (retValue){
                System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"成功");
            }else{
                System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"\t生产结束了");
    }

    public void MyConsumer()throws Exception{
        String result = null;
        while (FLAG){
            result = blockingQueue.poll(2L,TimeUnit.SECONDS);
            if (result == null){
                FLAG = false;
                System.out.println(Thread.currentThread().getName()+"消费为空,停止");
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
        }

    }

    public void stop(){
        this.FLAG = false;
    }
}

/**
 * @author Anzepeng
 * @title: PRodConsumer_BlockQueueDemo
 * @projectName learning
 * @description: TODO
 * @date 2020/8/14 0014下午 17:12
 */
public class PRodConsumer_BlockQueueDemo {

    public static void main(String[] args) {
        MyResource myResource = new MyResource(new ArrayBlockingQueue(10));
        new Thread(()->{
            System.out.println("生产开始");

            try {
                myResource.MyProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"prod").start();
        new Thread(()->{
            System.out.println("消费开始");

            try {
                myResource.MyConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"consumer").start();

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("叫停");
        System.out.println();
        System.out.println();

        myResource.stop();
    }
}