zl程序教程

您现在的位置是:首页 >  其它

当前栏目

8-并发包

发包
2023-06-13 09:14:07 时间

并发包

概念

在实际开发中不考虑线程安全的情况下,一般不需要做线程安全处理,防止过多的处理导致性能变差

但是开发中有很多业务需要考虑线程安全的相关问题,此时就必须考虑线程安全的处理

Java为很多业务场景提供了性能优异,且线程安全的并发包

ConcurrentHashMap

package ConcurrentHashMap;

import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CHMDemo1 {

    //public static Map<String,String> maps=new HashMap<>();
    //public static Map<String,String> maps=new Hashtable<>();
    public static Map<String,String> maps=new ConcurrentHashMap<>();

    public static void main(String[] args) {
        Runnable target = new CHMRunnable();
        Thread t1 = new Thread(target);
        Thread t2 = new Thread(target);
        t1.start();
        t2.start();


        /*
        * 注意:首先这里不可以直接简化用new Thread(target).start();开启线程
        * 两个线程必须实例化对象,这是由于后续要用到join()方法
        * join方法的作用在于避免主线程和t1,t2两个线程争抢CPU
        * 使用join方法后,在t1,t2执行完之前,后续主线程都不会执行
        * 这就避免了提前打出maps长度导致结果并不是最终运行的结果
        * 同时t1,t2都用到了join方法,所以二者之间仍然是并发执行
        * */
        try {
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("maps长度="+maps.size());
        /*
        * 可以看到,当使用hashmap类型时,最终的执行结果并不是1000000
        * 这是由于单纯通过hashmap算法可能出现两个线程同时插入到同一线程的情况
        * HashMap时线程不安全的,性能好
        *
        * 可以看到,当使用hashtable类型时,最终执行结果始终都是1000000
        * 可见hashtable保证了线程的安全性,但是这种方式效率极低,在项目中基本被舍弃
        * 因为它的实现方式在每个方法中都加上了锁
        * HashTable时线程安全的,性能较差
        *
        * ConcurrentHashMap的实现与hashtable不同,它只锁住对应变量的桶(分段式锁)
        * 所以其他变量的增删改不会相互影响,效率得到了极大的保证
        * 保证了线程安全,综合性能较好
        * */
    }
}


class CHMRunnable implements Runnable{

    @Override
    public void run() {
        for(int i=0;i<500000;i++){
            CHMDemo1.maps.put(Thread.currentThread().getName()+i,Thread.currentThread().getName()+i);
        }
    }
}

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作,再执行自己

e.g. 线程1执行A任务和C任务,线程2执行B任务,且根据规定必须按照A,B,C的顺序执行任务:

所以就可以利用CountDownLatch保证在线程1执行完任务A后等待线程2执行完毕再执行剩余任务

package ConcurrentHashMap;

//CountDownLatch

import java.util.concurrent.CountDownLatch;

public class CDLDemo {
    public static void main(String[] args) {
        //创建CountDownLatch对象,用于监督1,2线程的执行情况
        //这里传入的参数是步数,表示唤醒线程需要的步数,部署为0时,等待的线程开始执行
        //这种方法比线程通信更加灵活
        CountDownLatch c=new CountDownLatch(1);
        new Thread_first(c).start();
        new Thread_second(c).start();
    }
}

class Thread_first extends Thread{
    private CountDownLatch c;
    public Thread_first(CountDownLatch c) {
        this.c=c;

    }

    @Override
    public void run() {
        System.out.println("A");
        //等待状态,让当前线程让出CPU等待
        try {
            c.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("C");
    }
}

class Thread_second extends Thread{
    private CountDownLatch c;
    public Thread_second(CountDownLatch c) {
        this.c=c;

    }

    @Override
    public void run() {
        System.out.println("B");
        c.countDown();  //让计数器减一,当计数器为0时唤醒等待的线程
    }
}

可以看到,在使用countdownlatch之前,最终结果是ACB,使用countdownlatch之后,最终结果是ABC

CyclicBarrier

某个线程任务必须等待其他线程执行完毕以后才能最终触发自己执行

package ConcurrentHashMap;

//CyclicBarrier

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CBDemo {
    public static void main(String[] args) {
        //创建任务循环屏障对象,等到5个线程全部执行完毕之后触发一次任务
        // 第一个参数表示等待线程执行的个数,第二个参数表示执行的任务
        CyclicBarrier c=new CyclicBarrier(5,new CBRunnable());
        for (int i = 0; i < 5; i++) {
            new CBThread("用户"+i,c).start();
        }

    }
}

class CBRunnable implements Runnable{

    @Override
    public void run() {
        System.out.println("五个任务全部执行完毕,"+Thread.currentThread().getName()+"开始执行!");
    }
}

class CBThread extends Thread{
    private CyclicBarrier c;
    public CBThread(String s,CyclicBarrier c) {
        super(s);
        this.c=c;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+"正在执行!");
            c.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果

用户3正在执行!
用户1正在执行!
用户2正在执行!
用户0正在执行!
用户4正在执行!
五个任务全部执行完毕,用户3开始执行!

当修改代码中的for循环为10次时,可以看到最终运行结果为:

用户4正在执行!
用户6正在执行!
用户1正在执行!
用户5正在执行!
用户3正在执行!
用户2正在执行!
用户8正在执行!
用户7正在执行!
用户0正在执行!
用户9正在执行!
五个任务全部执行完毕,用户7开始执行!
五个任务全部执行完毕,用户9开始执行!

可以看到每个线程调用await()方法告诉c自己已经运行完成,然后当前线程被回收(注意,不是被销毁,如果是销毁则后续c执行传入的任务时获取当前线程得到的必然不是这些已经完成命名的线程),并且循环屏障是达到一组屏障就触发一次任务的执行,而不一定只执行一次

Semaphore

主要作用是控制线程的并发数量,synchronized可以起到“锁”的作用,保证在某个时间段内,只允许一个线程访问,而Seamaphore则可以设置同时允许几个线程进行。

Semaphore字面意思是信号量的意思,作用就是控制某段时间内访问特定资源的线程数目

package ConcurrentHashMap;

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        SemaTask task=new SemaTask();
        for (int i = 0; i < 10; i++) {
            new SemaThread(task).start();
        }

    }
}


//任务代码
class SemaTask{
    //创建Semaphore对象,第一个参数表示允许执行acquire和release之间内容的线程数量
    private Semaphore sema=new Semaphore(2);
    public void task(){
        try {
            sema.acquire();
            System.out.println(Thread.currentThread().getName()+"进入时间:"+System.currentTimeMillis());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"离开时间:"+System.currentTimeMillis());
            sema.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class SemaThread extends Thread{
    private SemaTask task;
    public SemaThread(SemaTask task){
        this.task=task;
    }
    @Override
    public void run() {
        task.task();
    }
}

Exchanger

一个用于线程间协作的工具类,Exchanger用于线程间的数据交换。

两个线程通过exchanger方法交换数据,如果第一个线程先执行exchanger方法,它会一直等待第二个线程也执行exchanger方法,当两个线程都达到同步点时(程序执行完毕),这两个线程就可以进行数据交换,将本线程生产出来的数据传递给对方

package ConcurrentHashMap;

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args) {
        //创建Exchanger交换数据
        Exchanger<String> exch=new Exchanger<String>();
        Thread t1=new task("task1",exch,"你好,我是task1");
        Thread t2=new task("task2",exch,"你好,我是task2");
        t1.start();
        t2.start();
    }
}

class task extends Thread{
    private Exchanger<String> exch;
    private String result;
    public task(String name,Exchanger exch,String result){
        super(name);
        this.exch=exch;
        this.result=result;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"正在执行工作流程");
        //交换结果
        try {
            String s=exch.exchange(result);
            System.out.println(Thread.currentThread().getName()+"收到交换信息“"+s+"”");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果

task2正在执行工作流程
task1正在执行工作流程
task2收到交换信息“你好,我是task1”
task1收到交换信息“你好,我是task2”