jUC常见并发流程工具总结
使用示例
- 5个线程都完成工作后继续后续逻辑
用5个线程模拟5个工人工作,只有5个工人都完成工作后才能输出所有工作都完成。
java
public class WorkerCountDown {
private static Logger logger = LoggerFactory.getLogger(WorkerCountDown.class);
public static void main(String[] args) {
int workerSize = 5;
CountDownLatch workCount = new CountDownLatch(workerSize);
ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);
for (int i = 0; i < workerSize; i++) {
final int workerNum = i;
//5个工人输出完成工作后,扣减倒计时门闩数
threadPool.submit(() -> {
logger.info("worker[{}]完成手头的工作", workerNum);
workCount.countDown();
});
}
try {
//阻塞当前线程(主线程)往后走,只有倒计时门闩变为0之后才能继续后续逻辑
workCount.await();
} catch (InterruptedException e) {
logger.info("倒计时门闩阻塞失败,失败原因[{}]", e.getMessage(), e);
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
logger.info("所有工人都完成手头的工作了");
}
}
输出结果
java
[pool-1-thread-2] INFO com.guide.thread.base.WorkerCountDown - worker[1]完成手头的工作
[pool-1-thread-3] INFO com.guide.thread.base.WorkerCountDown - worker[2]完成手头的工作
[pool-1-thread-5] INFO com.guide.thread.base.WorkerCountDown - worker[4]完成手头的工作
[pool-1-thread-1] INFO com.guide.thread.base.WorkerCountDown - worker[0]完成手头的工作
[pool-1-thread-4] INFO com.guide.thread.base.WorkerCountDown - worker[3]完成手头的工作
[main] INFO com.guide.thread.base.WorkerCountDown - 所有工人都完成手头的工作了
- 模拟运动员赛跑
我们将一个个线程比作运动员,我们希望希望倒计时门闩变为0时,10个线程并发工作。
java
public class Racing {
private static Logger logger = LoggerFactory.getLogger(Racing.class);
public static void main(String[] args) {
logger.info("百米跑比赛开始");
int playerNum = 10;
CountDownLatch gun = new CountDownLatch(1);
ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
for (int i = 0; i < playerNum; i++) {
final int playNo = i;
threadPool.submit(() -> {
logger.info("[{}]号运动员已就绪", playNo);
try {
gun.await();
} catch (InterruptedException e) {
logger.info("[{}]号运动员线程阻塞失败,失败原因[{}]", playNo, e.getMessage(), e);
}
logger.info("[{}]号运动员已经到达重点", playNo);
});
}
gun.countDown();
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
logger.info("百米赛跑已结束");
}
}
输出结果
java
[main] INFO com.guide.thread.base.Racing - 百米跑比赛开始
[pool-1-thread-3] INFO com.guide.thread.base.Racing - [2]号运动员已就绪
[pool-1-thread-9] INFO com.guide.thread.base.Racing - [8]号运动员已就绪
[pool-1-thread-4] INFO com.guide.thread.base.Racing - [3]号运动员已就绪
[pool-1-thread-7] INFO com.guide.thread.base.Racing - [6]号运动员已就绪
[pool-1-thread-2] INFO com.guide.thread.base.Racing - [1]号运动员已就绪
[pool-1-thread-8] INFO com.guide.thread.base.Racing - [7]号运动员已就绪
[pool-1-thread-1] INFO com.guide.thread.base.Racing - [0]号运动员已就绪
[pool-1-thread-5] INFO com.guide.thread.base.Racing - [4]号运动员已就绪
[pool-1-thread-6] INFO com.guide.thread.base.Racing - [5]号运动员已就绪
[pool-1-thread-10] INFO com.guide.thread.base.Racing - [9]号运动员已就绪
[pool-1-thread-6] INFO com.guide.thread.base.Racing - [5]号运动员已经到达重点
[pool-1-thread-5] INFO com.guide.thread.base.Racing - [4]号运动员已经到达重点
[pool-1-thread-1] INFO com.guide.thread.base.Racing - [0]号运动员已经到达重点
[pool-1-thread-8] INFO com.guide.thread.base.Racing - [7]号运动员已经到达重点
[pool-1-thread-2] INFO com.guide.thread.base.Racing - [1]号运动员已经到达重点
[pool-1-thread-7] INFO com.guide.thread.base.Racing - [6]号运动员已经到达重点
[pool-1-thread-4] INFO com.guide.thread.base.Racing - [3]号运动员已经到达重点
[pool-1-thread-9] INFO com.guide.thread.base.Racing - [8]号运动员已经到达重点
[pool-1-thread-3] INFO com.guide.thread.base.Racing - [2]号运动员已经到达重点
[pool-1-thread-10] INFO com.guide.thread.base.Racing - [9]号运动员已经到达重点
[main] INFO com.guide.thread.base.Racing - 百米赛跑已结束
信号量常用于控制多线程争抢有些资源后才能工作的场景。例如我们给信号量设置资源总数为3,只有拿到3个资源的线程才能进行工作,其他线程必须等待使用使用资源的线程释放后才能进行争抢。
信号量基础使用示例
如下所示,我们设置信号量初始化3个许可证,并且设置为公平true
,这就意味着资源被线程争抢完后,后续线程必须阻塞等待,只有前一个线程用完许可资源释放足量空闲的资源后,才能继续争抢。
java
public class SemaphoreDemo {
//默认3个资源,并且公平争抢
private static Semaphore semaphore = new Semaphore(3,true);
private static Logger logger = LoggerFactory.getLogger(SemaphoreDemo.class);
public static void main(String[] args) {
int workSize = 5;
ExecutorService executorService = Executors.newFixedThreadPool(workSize);
for (int i = 0; i < workSize; i++) {
executorService.submit(() -> {
try {
semaphore.acquire(3);
logger.info("进行业务逻辑处理");
semaphore.release(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
while (!executorService.isTerminated()) {
}
}
}
注意事项
- 获取和释放的时候都可以指定数量,但是要保持一致。
- 公平性设置为true会更加合理
- 并不必须由获取许可证的线程释放许可证。可以是A获取,B释放。
当A线程需要等待某个条件的时候,它就去执行condition.await()
方法,一旦执行了await()
方法,线程就会进入阻塞状态。
如果线程B
执行condition.signal()
方法,则JVM
就会从被阻塞线程中找到等待该condition
的线程。当线程A收到可执行信号的时候,他的线程状态就会变成Runnable
可执行状态。
基础使用示例
java
public class ConditionObj {
private static Logger logger = LoggerFactory.getLogger(ConditionObj.class);
private ReentrantLock lock = new ReentrantLock();
//条件对象,操控线程的等待和通知
private Condition condition = lock.newCondition();
public void waitCondition() throws InterruptedException {
lock.lock();
try {
logger.info("等待达到条件后通知");
condition.await();
logger.info("收到通知,开始执行业务逻辑");
} finally {
lock.unlock();
logger.info("执行完成,释放锁");
}
}
public void notifyCondition() throws InterruptedException {
lock.lock();
try {
logger.info("达到条件发起通知");
condition.signal();
logger.info("发起通知结束");
} finally {
lock.unlock();
logger.info("发起通知执行完成,释放锁");
}
}
public static void main(String[] args) throws InterruptedException {
ConditionObj obj = new ConditionObj();
new Thread(() -> {
try {
obj.waitCondition();
//让出CPU时间片,交给主线程发起通知
Thread.sleep(3000);
} catch (InterruptedException e) {
logger.error("等待条件通知设置失败,失败原因 [{}]", e.getMessage(), e);
}
}).start();
Thread.sleep(3000);
obj.notifyCondition();
}
}
基于条件对象完成生产者、消费者模式
我们假设用一个队列存放一波生产者生产的资源,当资源满了通知消费者消费。当消费者消费空了,通知生产者生产。
所以这时候使用condition控制流程最合适。所以我们要定义两个信号,分别为:
- 资源未满(notFull): 生产者唤醒,消费者挂起。
- 资源为空(notEmpty):生产者挂起,消费者唤醒。
最终示例代码如下
java
public class ProducerMode {
private static Logger logger = LoggerFactory.getLogger(ProducerMode.class);
//锁
private static ReentrantLock lock = new ReentrantLock();
// 资源未满
private Condition notFull = lock.newCondition();
//资源为空
private Condition notEmpty = lock.newCondition();
private Queue<Integer> queue = new PriorityQueue<>(10);
private int queueMaxSize = 10;
/**
* 生产者
*/
private class Producer extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
if (queueMaxSize == queue.size()) {
logger.info("当前队列已满,通知消费者消费");
//等待不满条件触发
notFull.await();
}
queue.offer(1);
logger.info("生产者补货,当前队列有 【{}】", queue.size());
//通知消费者队列不空,可以消费
notEmpty.signal();
} catch (Exception e) {
logger.error("生产者报错,失败原因 [{}]", e.getMessage(), e);
} finally {
lock.unlock();
}
}
}
}
/**
* 消费者
*/
private class Consumer extends Thread {
@Override
public void run() {
while (true) {
lock.lock();
try {
if (0 == queue.size()) {
logger.info("当前队列已空,通知生产者补货");
//等待不空条件达到
notEmpty.await();
}
queue.poll();
//通知消费者不满了
notFull.signal();
logger.info("消费者完成消费,当前队列还剩余 【{}】个元素", queue.size());
} catch (Exception e) {
logger.error("生产者报错,失败原因 [{}]", e.getMessage(), e);
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ProducerMode mode = new ProducerMode();
Producer producer = mode.new Producer();
Consumer consumer = mode.new Consumer();
producer.start();
consumer.start();
}
}
输出结果
java
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【1】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【2】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【3】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【4】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【5】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【6】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【7】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【8】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【9】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【10】
[Thread-0] INFO com.guide.thread.base.ProducerMode - 当前队列已满,通知消费者消费
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【9】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【8】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【7】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【6】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【5】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【4】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【3】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【2】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【1】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【0】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 当前队列已空,通知生产者补货
循环栅栏对象
直到指定数量的线程都到达同一个点,然后才一起继续执行。
基础示例
java
public class CyclicBarrierDemo {
private static Logger logger = LoggerFactory.getLogger(CyclicBarrierDemo.class);
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("到齐5个人,准备触发");
});
int poolSize = 10;
ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);
for (int i = 0; i < poolSize; i++) {
threadPool.submit(() -> {
String name = Thread.currentThread().getName();
logger.info("线程[{}],已准备就绪,等待一波人员到期就出发", name);
try {
cyclicBarrier.await();
} catch (Exception e) {
logger.info("线程阻塞失败,失败原因 [{}]", e.getMessage(), e);
}
logger.info("已就绪5个线程,[{}]准备触发", name);
});
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
}
}
}
输出结果
java
[pool-1-thread-6] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-6],已准备就绪,等待一波人员到期就出发
[pool-1-thread-9] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-9],已准备就绪,等待一波人员到期就出发
[pool-1-thread-8] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-8],已准备就绪,等待一波人员到期就出发
[pool-1-thread-1] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-1],已准备就绪,等待一波人员到期就出发
[pool-1-thread-5] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-5],已准备就绪,等待一波人员到期就出发
[pool-1-thread-10] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-10],已准备就绪,等待一波人员到期就出发
[pool-1-thread-2] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-2],已准备就绪,等待一波人员到期就出发
[pool-1-thread-7] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-7],已准备就绪,等待一波人员到期就出发
[pool-1-thread-4] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-4],已准备就绪,等待一波人员到期就出发
[pool-1-thread-3] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-3],已准备就绪,等待一波人员到期就出发
[pool-1-thread-5] INFO com.guide.thread.base.CyclicBarrierDemo - 到齐5个人,准备触发
[pool-1-thread-5] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-5]准备触发
[pool-1-thread-3] INFO com.guide.thread.base.CyclicBarrierDemo - 到齐5个人,准备触发
[pool-1-thread-3] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-3]准备触发
[pool-1-thread-6] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-6]准备触发
[pool-1-thread-9] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-9]准备触发
[pool-1-thread-1] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-1]准备触发
[pool-1-thread-8] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-8]准备触发
[pool-1-thread-2] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-2]准备触发
[pool-1-thread-7] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-7]准备触发
[pool-1-thread-10] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-10]准备触发
[pool-1-thread-4] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-4]准备触发
CountDownLatch
用户事件,循环栅栏作用于线程。- 循环栅栏可重复使用,
CountDownLatch
则不能。