您现在的位置是:首页 > Java 当前栏目 Java并发——Semaphore JAVA 并发 线程 2023-03-02 11:11:20 时间 ### Semaphore ### Semaphore即信号、信号系统。此类的主要作用就是限制线程并发的数量,如果不限制线程并发的数量,则CPU的资源很快就被耗尽,每个线程执行的任务是相当缓慢,因为CPU要把时间片分配给不同的线程对象,而且上下文切换也要耗时,最终造成系统运行效率大幅降低,所以限制并发线程的数量还是非常有必要的。 #### Semaphore的同步性 #### public class Service { private Semaphore semaphore = new Semaphore(1); public void testMethod() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "begin:" + System.currentTimeMillis()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "end:" + System.currentTimeMillis()); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { Service service = new Service(); ThreadA a = new ThreadA(service); a.setName("A"); a.start(); ThreadB b = new ThreadB(service); b.setName("B"); b.start(); ThreadC c = new ThreadC(service); c.setName("C"); c.start(); } } class ThreadA extends Thread { private Service service; public ThreadA(Service service) { super(); this.service = service; } @Override public void run() { service.testMethod(); } } class ThreadB extends Thread { private Service service; public ThreadB(Service service) { super(); this.service = service; } @Override public void run() { service.testMethod(); } } class ThreadC extends Thread { private Service service; public ThreadC(Service service) { super(); this.service = service; } @Override public void run() { service.testMethod(); } } ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70] 类Semaphore的构造函数参数permits是许可的意思,代表同一时间内,最多允许多少个线程同时执行acquire()和release()之间的代码。无参方法acquire(的作用是使用1个许可,是减法操作。 其实还可以传人>1的许可,代表同-时间内,最多允许有x个线程可以执行acquire()和release()之间的代码。 acquire(int permits): 每调用一次方法,就使用permits个许可。 release(int permits): 每调用一次方法,就增加permits个许可,可以动态增加许可的个数: Semaphore semaphore = new Semaphore(5); semaphore.acquire(); semaphore.acquire(); semaphore.acquire(); semaphore.acquire(); semaphore.acquire(); System.out.println(semaphore.availablePermits()); semaphore.release(); semaphore.release(); semaphore.release(); semaphore.release(); semaphore.release(); semaphore.release(); System.out.println(semaphore.availablePermits()); semaphore.release(4); System.out.println(semaphore.availablePermits()); ![在这里插入图片描述][20200621235057360.png] acquireUninterruptibly(): 使等待进入acquire()方法的线程,不允许被中断。 public class Service2 { private Semaphore semaphore = new Semaphore(1); public void test() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "begin:" + System.currentTimeMillis()); for (int i = 0; i < Integer.MAX_VALUE / 100; i++) { String str = new String(); Math.random(); } System.out.println(Thread.currentThread().getName() + "end:" + System.currentTimeMillis()); semaphore.release(); } catch (InterruptedException e) { System.out.println("线程" + Thread.currentThread().getName() + "进入了Catch"); e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { Service2 service2 = new Service2(); ThreadA a = new ThreadA(service2); a.setName("A"); a.start(); ThreadB b = new ThreadB(service2); b.setName("B"); b.start(); b.interrupt(); System.out.println("main"); } } class ThreadA extends Thread { private Service2 service; public ThreadA(Service2 service) { super(); this.service = service; } @Override public void run() { service.test(); } } class ThreadB extends Thread { private Service2 service; public ThreadB(Service2 service) { super(); this.service = service; } @Override public void run() { service.test(); } } 线程B成功被中断。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 1] 修改acquire方法: ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 2] 再次运行效果如下: ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 3] acquireUninterruptibly()方法还有重载的写法`acquireUninterruptibly(int permits)`,此方法的作用是在等待许可的情况下不允许中断,如果成功获得锁,则取得指定的permits许可个数。 availablePermits()和drainPermits()方法: `availablePermits()`返回此Semaphore对象中当前可用的许可数,此方法通常用于调试,因为许可的数量有可能实时在改变,并不是固定的数量。 `drainPermits()`可获取并返回立即可用的所有许可个数,并且将可用许可置0。 Semaphore semaphore = new Semaphore(10); semaphore.acquire(); System.out.println(semaphore.drainPermits() + " " + semaphore.availablePermits()); System.out.println(semaphore.drainPermits() + " " + semaphore.availablePermits()); System.out.println(semaphore.drainPermits() + " " + semaphore.availablePermits()); semaphore.release(); ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 4] getQueueLength()和hasQueuedThreads(): 方法`getQueueLength()`的作用是取得等待许可的线程个数。 方法`hasQueuedThreads()`的作用是判断有没有线程在等待这个许可。 这两个方法通常都是在判断当前有没有等待许可的线程信息时使用。 public class MyService { private Semaphore semaphore = new Semaphore(1); public void test() { try { semaphore.acquire(); Thread.sleep(1000); System.out.println("还有大约" + semaphore.getQueueLength() + "个线程在等待"); System.out.println("是否有线程正在等待信号量?" + semaphore.hasQueuedThreads()); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } public static void main(String[] args) { MyService myService = new MyService(); MyThread firstThread = new MyThread(myService); firstThread.start(); MyThread[] arr = new MyThread[4]; for (int i = 0; i < 4; i++) { arr[i] = new MyThread(myService); arr[i].start(); } } } ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 5] 公平与非公平信号量: 有些时候,获得许可的顺序与线程启动的顺序有关,这时信号量就要分为公平与非公平的。所谓的公平信号量是获得锁的顺序与线程启动的顺序有关,但不代表100%地获得信号量,仅仅是在概率上能得到保证。而非公平信号量就是无关的了。 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } 构造中传入true,即为公平信号量;反之为非公平。公平信号量运行的效果是线程启动的顺序与调用`semaphore.acquire()`的顺序有关,也就是先启动的线程优先获得许可。 tryAcquire(): 无参方法tryAcquire()的作用是尝试地获得1个许可,如果获取不到则返回false,此方法通常与if语句结合使用,其具有无阻塞的特点。无阻塞的特点可以使线程不至于在同步处一直持续等待的状态,如果if语句判断不成立则线程会继续走else语句,程序会继续向下运行。 tryAcquire(int permits): 有参方法tryAcquire(int permits)的作用是尝试地获得permits个许可,如果获取不到则返回false。 tryAcquire(long timeout, TimeUnit unit): 有参方法tryAcquire(int long timeout,TimeUnit unit)的作用是在指定的时间内尝试地获得1个许可,如果获取不到则返回false。 tryAcquire(int permits, long timeout, TimeUnit unit): 有参方法tryAcquire(int permits,long timeout,TimeUnit unit)的作用是在指定的时间内尝试地获得permits个许可,如果获取不到则返回false。 #### 使用Semaphore实现多生产者/多消费者模式 #### public class RepastService { volatile private Semaphore setSemaphore = new Semaphore(10);//10个厨师 volatile private Semaphore getSemaphore = new Semaphore(20);//食客 volatile private ReentrantLock lock = new ReentrantLock(); volatile private Condition setCondition = lock.newCondition(); volatile private Condition getCondition = lock.newCondition(); //只有3个盘子放菜品 volatile private Object[] producePosition = new Object[4]; private boolean isEmpty() { boolean isEmty = true; for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] != null) { isEmty = false; break; } } if (isEmty == true) { return true; } else { return false; } } private boolean isFull() { boolean isFull = true; for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] == null) { isFull = false; break; } } return isFull; } public void set() { try { setSemaphore.acquire();//允许同时最多有10个厨师炒菜 lock.lock(); while (isFull()) { setCondition.await(); } for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] == null) { producePosition[i] = "菜"; System.out.println(Thread.currentThread().getName() + "生产了" + producePosition[i]); break; } } getCondition.signalAll(); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { setSemaphore.release(); } } public void get() { try { getSemaphore.acquire();//允许同时最多20个食客 lock.lock(); while (isEmpty()) { getCondition.await(); } for (int i = 0; i < producePosition.length; i++) { if (producePosition[i] != null) { System.out.println(Thread.currentThread().getName() + "吃了" + producePosition[i]); producePosition[i] = null; break; } } setCondition.signalAll(); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { getSemaphore.release(); } } public static void main(String[] args) throws InterruptedException { RepastService service = new RepastService(); ThreadP[] arrP = new ThreadP[60]; ThreadC[] arrC = new ThreadC[60]; for (int i = 0; i < 60; i++) { arrP[i] = new ThreadP(service); arrC[i] = new ThreadC(service); } Thread.sleep(2000); for (int i = 0; i < 60; i++) { arrP[i].start(); arrC[i].start(); } } } //厨师 class ThreadP extends Thread { private RepastService service; public ThreadP(RepastService service) { this.service = service; } @Override public void run() { service.set(); } } //食客 class ThreadC extends Thread { private RepastService service; public ThreadC(RepastService service) { this.service = service; } @Override public void run() { service.get(); } } ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 6] [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70]: /images/20210813/827fd6bc78b74c36bd4bc9fbc9425f18.png [20200621235057360.png]: /images/20210813/9183060d063e43269b503940bd06d775.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 1]: /images/20210813/2e3d718289ae4251b2abf45432aa444f.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 2]: /images/20210813/156a29165c1e4c8bb384aae0cbba82ba.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 3]: /images/20210813/7b91288118f14fc29a56c3ba7f17af73.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 4]: /images/20210813/cd822104a554422ca1f0bbf53e4f301c.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 5]: /images/20210813/2ba59a3ffc6f47d7adb2e5ee2f0b2dae.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NvbGRfX19wbGF5_size_16_color_FFFFFF_t_70 6]: /images/20210813/3a9fdbf491d04bb79dd5a2e1cd79931f.png 本文地址: Java并发——Semaphore 相关文章 基数排序java Java线程 Java线程 Java--线程 static变量 java java final关键字 java-抽象类 java 日期工具 获取Java编译器 Java 日期处理 java处理日期 Java 日期时间 Java构造方法 Java 构造方法 Java super继承 Java @Override理解 JAVA锁机制 java锁机制 Java instanceof 运算符 Java ConcurrentMap 接口