# 1. Semaphore 定义

个人理解同一时间内,限制指定数量线程通过

# 2. Semaphore 的同步性

a
package com.hc.thread.chapterOne.SemaPhore;
import java.util.concurrent.Semaphore;
/**
 * 同一时间内  限制多个线程通过
 */
public class SemaPhoreT {
    private Semaphore semaphore = new Semaphore(1);
    public void testMethod() {
        try {
            // 限制
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " begin time:" + System.currentTimeMillis());
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " end time:" + System.currentTimeMillis());
            // 释放
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

<img src="/img/thread/permits.png">

类 Semaphore 的构造函数 permits 可以理解为同时刻通过的线程许可数,代表同一时间内最多允许多少个线程同时执行
acquire () 和 release () 之间的代码
eg: 无参方法的作用是使用 1 个许可

a
public static void main(String[] args) {
        SemaPhoreT semaPhoreT = new SemaPhoreT();
        new Thread(new Runnable() {
            @Override
            public void run() {
                semaPhoreT.testMethod();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                semaPhoreT.testMethod();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                semaPhoreT.testMethod();
            }
        }).start();
    }

print:

h
Thread-0 begin time:1526479020027
Thread-0 end time:1526479021028
Thread-1 begin time:1526479021028
Thread-1 end time:1526479022028
Thread-2 begin time:1526479022028
Thread-2 end time:1526479023029

可以看到打印信息依次输出,如果给为 1 个许可相当于这一段的时候是单线程的

我们改改: private Semaphore semaphore = new Semaphore(2);
print:

h
Thread-0 begin time:1526479290849
Thread-1 begin time:1526479290850
Thread-0 end time:1526479291849
Thread-2 begin time:1526479291849
Thread-1 end time:1526479291850
Thread-2 end time:1526479292850

这个打印结果说明同一时刻是有 0 跟 1 两个线程通过 acquire () 和 release () 之间的

# 3. Semaphore 实现生产者、消费者

Semaphore 实现生产者、消费者模式的话还是比较简单的
我们以厨师、顾客来进行模拟这样一个场景,废话不多说,直接上代码:

a
package com.hc.thread.chapterOne.SemaPhore;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
 *  Semaphore 实现生产者、消费者
 *  created by cheng on 2018/5/16
 */
public class RepastService {
    /**
     * 设置 5 个厨师(生产者)
     */
    private volatile Semaphore setSemaphore = new Semaphore(5);
    /**
     * 设置 10 个顾客(消费者)
     */
    private volatile Semaphore getSemaphore = new Semaphore(10);
    private volatile ReentrantLock reentrantLock = new ReentrantLock();
    private volatile Condition setCondition = reentrantLock.newCondition();
    private volatile Condition getCondition = reentrantLock.newCondition();
    /**
     * 最多一次上 4 盘菜
     */
    private volatile Object[] producePosition = new Object[4];
    private boolean isEmpty() {
        boolean isEmpty = true;
        for (int i = 0; i < producePosition.length; i++) {
            if (producePosition[i] != null) {
                isEmpty = false;
                break;
            }
        }
        return  isEmpty;
    }
    /**
     * 判断有没有空盘子
     * @return true:没有空盘子 反之则有
     */
    private boolean isFull() {
        boolean isFull = true;
        for (int i = 0; i < producePosition.length; i++) {
            if (producePosition[i] == null) {
                isFull = false;
                break;
            }
        }
        return  isFull;
    }
    public void set() {
        try {
            setSemaphore.acquire();
            reentrantLock.lock();
            while (isFull()) {
                // 没有空盘子  厨师要等待
                setCondition.await();
            }
            for (int i = 0; i < producePosition.length; i++) {
                if (producePosition[i] == null) {
                    // 发现有空盘子了  可以上菜
                    producePosition[i] = "xx菜";
                    System.out.println(Thread.currentThread().getName() + " 生产了 " + producePosition[i]);
                    break;
                }
            }
            // 上菜
            getCondition.signalAll();
            reentrantLock.unlock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            setSemaphore.release();
        }
    }
    public void get() {
        try {
            getSemaphore.acquire();
            reentrantLock.lock();
            while (isEmpty()) {
                // 菜已经上齐了  没盘子装了  吃完了才能上
                getCondition.await();
            }
            for (int i = 0; i < producePosition.length; i++) {
                if (producePosition[i] != null) {
                    // 发现有菜上来  可以开饭了
                    System.out.println(Thread.currentThread().getName() + " 消费了 " + producePosition[i]);
                    producePosition[i] = null;
                    break;
                }
            }
            // 端盘子下去
            setCondition.signalAll();
            reentrantLock.unlock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            getSemaphore.release();
        }
    }
}

启动类 Run.java

a
package com.hc.thread.chapterOne.SemaPhore;
public class Run {
    public static void main(String[] args) throws InterruptedException {
        RepastService repastService = new RepastService();
        ThreadP[] arrayP = new ThreadP[20];
        ThreadC[] arrayC = new ThreadC[20];
        for (int i = 0; i < 20; i++) {
            arrayP[i] = new ThreadP(repastService);
            arrayC[i] = new ThreadC(repastService);
        }
        Thread.sleep(2000);
        for (int i = 0; i < 20; i++) {
            arrayP[i].start();
            arrayC[i].start();
        }
    }
}

线程类 ThreadP.java

a
package com.hc.thread.chapterOne.SemaPhore;
public class ThreadP extends Thread {
    private RepastService service;
    public ThreadP(RepastService service) {
        super();
        this.service = service;
    }
    @Override
    public void run() {
        service.set();
    }
}

ReentrantLock Condition 大家不明白的先自行度娘,后面我抽空再补上,emmmm...
ReentrantLock 在这里再做一重锁的判断,确保生产者跟消费者都是平衡的
如果不加 ReentrantLock 会怎么样,因为我们是用的 Condition 进行一个相互唤醒的操作,不用 ReentrantLock 的话可能会报 IllegalMonitorStateException 的异常

输出的话就给大家截个图:
<img src="/img/thread/out.png">

# 4. 总结

Semaphore semaphore = new Semaphore (1) 其实只是初始化多少个许可
acquire () 相当于动态的减少许可,相应的 release () 可以动态的增加许可
Semaphore 提供的限制并发线程的功能,此功能在默认的 synchronized 种是不提供的

源码地址
源码在 thread 目录下,还有一些其他 demo, 大家可自行学习

更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

vayi 微信支付

微信支付

vayi 支付宝

支付宝

vayi 贝宝

贝宝