Skip to content
文章目录

jUC常见并发流程工具总结

使用示例

  1. 5个线程都完成工作后继续后续逻辑

用5个线程模拟5个工人工作,只有5个工人都完成工作后才能输出所有工作都完成。

java
public class WorkerCountDown {

    private static Logger logger = LoggerFactory.getLogger(WorkerCountDown.class);

    public static void main(String[] args) {
        int workerSize = 5;
        CountDownLatch workCount = new CountDownLatch(workerSize);
        ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);
        for (int i = 0; i < workerSize; i++) {
            final int workerNum = i;
            //5个工人输出完成工作后,扣减倒计时门闩数
            threadPool.submit(() -> {
                logger.info("worker[{}]完成手头的工作", workerNum);
                workCount.countDown();
            });
        }

        try {
            //阻塞当前线程(主线程)往后走,只有倒计时门闩变为0之后才能继续后续逻辑
            workCount.await();
        } catch (InterruptedException e) {
            logger.info("倒计时门闩阻塞失败,失败原因[{}]", e.getMessage(), e);
        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        logger.info("所有工人都完成手头的工作了");
    }
}

输出结果

java
[pool-1-thread-2] INFO com.guide.thread.base.WorkerCountDown - worker[1]完成手头的工作
[pool-1-thread-3] INFO com.guide.thread.base.WorkerCountDown - worker[2]完成手头的工作
[pool-1-thread-5] INFO com.guide.thread.base.WorkerCountDown - worker[4]完成手头的工作
[pool-1-thread-1] INFO com.guide.thread.base.WorkerCountDown - worker[0]完成手头的工作
[pool-1-thread-4] INFO com.guide.thread.base.WorkerCountDown - worker[3]完成手头的工作
[main] INFO com.guide.thread.base.WorkerCountDown - 所有工人都完成手头的工作了
  1. 模拟运动员赛跑

我们将一个个线程比作运动员,我们希望希望倒计时门闩变为0时,10个线程并发工作。

java
public class Racing {

    private static Logger logger = LoggerFactory.getLogger(Racing.class);

    public static void main(String[] args) {

        logger.info("百米跑比赛开始");

        int playerNum = 10;
        CountDownLatch gun = new CountDownLatch(1);
        ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
        for (int i = 0; i < playerNum; i++) {
            final int playNo = i;
            threadPool.submit(() -> {
                logger.info("[{}]号运动员已就绪", playNo);
                try {
                    gun.await();
                } catch (InterruptedException e) {
                    logger.info("[{}]号运动员线程阻塞失败,失败原因[{}]", playNo, e.getMessage(), e);
                }
                logger.info("[{}]号运动员已经到达重点", playNo);
            });
        }


        gun.countDown();

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        logger.info("百米赛跑已结束");

    }
}

输出结果

java
[main] INFO com.guide.thread.base.Racing - 百米跑比赛开始
[pool-1-thread-3] INFO com.guide.thread.base.Racing - [2]号运动员已就绪
[pool-1-thread-9] INFO com.guide.thread.base.Racing - [8]号运动员已就绪
[pool-1-thread-4] INFO com.guide.thread.base.Racing - [3]号运动员已就绪
[pool-1-thread-7] INFO com.guide.thread.base.Racing - [6]号运动员已就绪
[pool-1-thread-2] INFO com.guide.thread.base.Racing - [1]号运动员已就绪
[pool-1-thread-8] INFO com.guide.thread.base.Racing - [7]号运动员已就绪
[pool-1-thread-1] INFO com.guide.thread.base.Racing - [0]号运动员已就绪
[pool-1-thread-5] INFO com.guide.thread.base.Racing - [4]号运动员已就绪
[pool-1-thread-6] INFO com.guide.thread.base.Racing - [5]号运动员已就绪
[pool-1-thread-10] INFO com.guide.thread.base.Racing - [9]号运动员已就绪
[pool-1-thread-6] INFO com.guide.thread.base.Racing - [5]号运动员已经到达重点
[pool-1-thread-5] INFO com.guide.thread.base.Racing - [4]号运动员已经到达重点
[pool-1-thread-1] INFO com.guide.thread.base.Racing - [0]号运动员已经到达重点
[pool-1-thread-8] INFO com.guide.thread.base.Racing - [7]号运动员已经到达重点
[pool-1-thread-2] INFO com.guide.thread.base.Racing - [1]号运动员已经到达重点
[pool-1-thread-7] INFO com.guide.thread.base.Racing - [6]号运动员已经到达重点
[pool-1-thread-4] INFO com.guide.thread.base.Racing - [3]号运动员已经到达重点
[pool-1-thread-9] INFO com.guide.thread.base.Racing - [8]号运动员已经到达重点
[pool-1-thread-3] INFO com.guide.thread.base.Racing - [2]号运动员已经到达重点
[pool-1-thread-10] INFO com.guide.thread.base.Racing - [9]号运动员已经到达重点
[main] INFO com.guide.thread.base.Racing - 百米赛跑已结束

信号量常用于控制多线程争抢有些资源后才能工作的场景。例如我们给信号量设置资源总数为3,只有拿到3个资源的线程才能进行工作,其他线程必须等待使用使用资源的线程释放后才能进行争抢。

信号量基础使用示例

如下所示,我们设置信号量初始化3个许可证,并且设置为公平true,这就意味着资源被线程争抢完后,后续线程必须阻塞等待,只有前一个线程用完许可资源释放足量空闲的资源后,才能继续争抢。

在这里插入图片描述

java
public class SemaphoreDemo {

    //默认3个资源,并且公平争抢
    private static Semaphore semaphore = new Semaphore(3,true);


    private static Logger logger = LoggerFactory.getLogger(SemaphoreDemo.class);

    public static void main(String[] args) {
        int workSize = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(workSize);
        for (int i = 0; i < workSize; i++) {
            executorService.submit(() -> {
                try {
                    semaphore.acquire(3);
                    logger.info("进行业务逻辑处理");
                    semaphore.release(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
        while (!executorService.isTerminated()) {

        }


    }
}

注意事项

  1. 获取和释放的时候都可以指定数量,但是要保持一致。
  2. 公平性设置为true会更加合理
  3. 并不必须由获取许可证的线程释放许可证。可以是A获取,B释放。

当A线程需要等待某个条件的时候,它就去执行condition.await()方法,一旦执行了await()方法,线程就会进入阻塞状态。

如果线程B执行condition.signal()方法,则JVM就会从被阻塞线程中找到等待该condition的线程。当线程A收到可执行信号的时候,他的线程状态就会变成Runnable可执行状态。

基础使用示例

java
public class ConditionObj {

    private static Logger logger = LoggerFactory.getLogger(ConditionObj.class);


    private ReentrantLock lock = new ReentrantLock();
    //条件对象,操控线程的等待和通知
    private Condition condition = lock.newCondition();

    public void waitCondition() throws InterruptedException {
        lock.lock();
        try {
            logger.info("等待达到条件后通知");
            condition.await();
            logger.info("收到通知,开始执行业务逻辑");
        } finally {
            lock.unlock();
            logger.info("执行完成,释放锁");
        }
    }


    public void notifyCondition() throws InterruptedException {
        lock.lock();
        try {
            logger.info("达到条件发起通知");
            condition.signal();
            logger.info("发起通知结束");
        } finally {
            lock.unlock();
            logger.info("发起通知执行完成,释放锁");
        }
    }


    public static void main(String[] args) throws InterruptedException {
        ConditionObj obj = new ConditionObj();

        new Thread(() -> {
            try {
                obj.waitCondition();
                //让出CPU时间片,交给主线程发起通知
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                logger.error("等待条件通知设置失败,失败原因 [{}]", e.getMessage(), e);
            }
        }).start();


        Thread.sleep(3000);
        obj.notifyCondition();
    }

}

基于条件对象完成生产者、消费者模式

我们假设用一个队列存放一波生产者生产的资源,当资源满了通知消费者消费。当消费者消费空了,通知生产者生产。

所以这时候使用condition控制流程最合适。所以我们要定义两个信号,分别为:

  1. 资源未满(notFull): 生产者唤醒,消费者挂起。
  2. 资源为空(notEmpty):生产者挂起,消费者唤醒。

在这里插入图片描述

最终示例代码如下

java
public class ProducerMode {
    private static Logger logger = LoggerFactory.getLogger(ProducerMode.class);
    //锁
    private static ReentrantLock lock = new ReentrantLock();
    // 资源未满
    private Condition notFull = lock.newCondition();
    //资源为空
    private Condition notEmpty = lock.newCondition();

    private Queue<Integer> queue = new PriorityQueue<>(10);
    private int queueMaxSize = 10;

    /**
     * 生产者
     */
    private class Producer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (queueMaxSize == queue.size()) {
                        logger.info("当前队列已满,通知消费者消费");
                        //等待不满条件触发
                        notFull.await();

                    }

                    queue.offer(1);
                    logger.info("生产者补货,当前队列有 【{}】", queue.size());
                    //通知消费者队列不空,可以消费
                    notEmpty.signal();
                } catch (Exception e) {
                    logger.error("生产者报错,失败原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }
    }


    /**
     * 消费者
     */
    private class Consumer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (0 == queue.size()) {
                        logger.info("当前队列已空,通知生产者补货");
                        //等待不空条件达到
                        notEmpty.await();

                    }

                    queue.poll();
                    //通知消费者不满了
                    notFull.signal();
                    logger.info("消费者完成消费,当前队列还剩余 【{}】个元素", queue.size());
                } catch (Exception e) {
                    logger.error("生产者报错,失败原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }
    }

    public static void main(String[] args) {
        ProducerMode mode = new ProducerMode();
        Producer producer = mode.new Producer();
        Consumer consumer = mode.new Consumer();
        producer.start();
        consumer.start();
    }
}

输出结果

java
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【1
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【2
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【3
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【4
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【5
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【6
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【7
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【8
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【9
[Thread-0] INFO com.guide.thread.base.ProducerMode - 生产者补货,当前队列有 【10
[Thread-0] INFO com.guide.thread.base.ProducerMode - 当前队列已满,通知消费者消费
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【9】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【8】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【7】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【6】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【5】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【4】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【3】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【2】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【1】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 消费者完成消费,当前队列还剩余 【0】个元素
[Thread-1] INFO com.guide.thread.base.ProducerMode - 当前队列已空,通知生产者补货

循环栅栏对象

直到指定数量的线程都到达同一个点,然后才一起继续执行。

基础示例

java
public class CyclicBarrierDemo {

    private static Logger logger = LoggerFactory.getLogger(CyclicBarrierDemo.class);


    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("到齐5个人,准备触发");
        });


        int poolSize = 10;
        ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);

        for (int i = 0; i < poolSize; i++) {
            threadPool.submit(() -> {
                String name = Thread.currentThread().getName();
                logger.info("线程[{}],已准备就绪,等待一波人员到期就出发", name);
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    logger.info("线程阻塞失败,失败原因 [{}]", e.getMessage(), e);
                }
                logger.info("已就绪5个线程,[{}]准备触发", name);
            });
        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }
    }


}

输出结果

java
[pool-1-thread-6] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-6],已准备就绪,等待一波人员到期就出发
[pool-1-thread-9] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-9],已准备就绪,等待一波人员到期就出发
[pool-1-thread-8] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-8],已准备就绪,等待一波人员到期就出发
[pool-1-thread-1] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-1],已准备就绪,等待一波人员到期就出发
[pool-1-thread-5] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-5],已准备就绪,等待一波人员到期就出发
[pool-1-thread-10] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-10],已准备就绪,等待一波人员到期就出发
[pool-1-thread-2] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-2],已准备就绪,等待一波人员到期就出发
[pool-1-thread-7] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-7],已准备就绪,等待一波人员到期就出发
[pool-1-thread-4] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-4],已准备就绪,等待一波人员到期就出发
[pool-1-thread-3] INFO com.guide.thread.base.CyclicBarrierDemo - 线程[pool-1-thread-3],已准备就绪,等待一波人员到期就出发
[pool-1-thread-5] INFO com.guide.thread.base.CyclicBarrierDemo - 到齐5个人,准备触发
[pool-1-thread-5] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-5]准备触发
[pool-1-thread-3] INFO com.guide.thread.base.CyclicBarrierDemo - 到齐5个人,准备触发
[pool-1-thread-3] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-3]准备触发
[pool-1-thread-6] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-6]准备触发
[pool-1-thread-9] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-9]准备触发
[pool-1-thread-1] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-1]准备触发
[pool-1-thread-8] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-8]准备触发
[pool-1-thread-2] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-2]准备触发
[pool-1-thread-7] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-7]准备触发
[pool-1-thread-10] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-10]准备触发
[pool-1-thread-4] INFO com.guide.thread.base.CyclicBarrierDemo - 已就绪5个线程,[pool-1-thread-4]准备触发
  1. CountDownLatch用户事件,循环栅栏作用于线程。
  2. 循环栅栏可重复使用,CountDownLatch则不能。

参考文献

控制并发流程