课程内容

JDK除了提供synchronized以外,还丰富了很多JUC工具类供我们在不同的场景中使用,主要的如:ReentrantLock,实现了可重入的独占锁;Semaphore信号量;CountDownLatch闭锁

*一、ReentrantLock(可重入的独占锁)

基本介绍

ReentrantLock是一种可重入的独占锁,它允许同一个线程多次获取同一个锁而不会被阻塞。
它的功能类似于synchronized是一种互斥锁,可以保证线程安全。

特点

相对于 synchronized, ReentrantLock具备如下特点:

  • 可中断
  • 可以设置超时时间
  • 可以切换为公平/非公平锁(默认非公平)
  • 可以尝试获取锁
  • 支持多个条件变量
  • 与synchronized一样,都支持可重入

应用场景

它的主要应用场景是在多线程环境下对共享资源进行独占式访问,以保证数据的一致性和安全性。从这点来看,他跟synchronized的使用场景区别不是很大
在这里插入图片描述
ReentrantLock具体应用场景如下:

  1. 解决多线程竞争资源的问题,例如多个线程同时对同一个数据库进行写操作,可以使用ReentrantLock保证每次只有一个线程能够写入。
  2. 实现多线程任务的顺序执行,例如在一个线程执行完某个任务后,再让另一个线程执行任务。
  3. 实现多线程等待/通知机制,例如在某个线程执行完某个任务后,通知其他线程继续执行任务。

与synchronized的区别

在这里插入图片描述

常用API

void lock(); // 获取锁,调用该方法当前线程会获取锁,当锁获得后,该方法返回
void unlock(); // 释放锁

void lockInterruptibly() throws InterruptedException; // 可中断的获取锁,和lock()方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程

boolean tryLock(); // 尝试非阻塞的获取锁,调用该方法后立即返回。如果能够获取到返回true,否则返回false

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 超时获取锁,当前线程在以下三种情况下会被返回:
当前线程在超时时间内获取了锁
当前线程在超时时间内被中断
超时时间结束,返回false

Condition newCondition(); // 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的await()方法,而调用后,当前线程将释放锁

基本语法

//加锁  阻塞 
lock.lock(); 
try {  
    ...
} finally { 
    // 解锁 
    lock.unlock();  
}


//尝试加锁   非阻塞
if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        ...
    } finally {
        lock.unlock();
    }
}

在使用时要注意 4 个问题:

  1. 默认情况下 ReentrantLock 为非公平锁而非公平锁;
  2. 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
  3. 加锁操作一定要放在 try 代码之前,这样可以避免未加锁成功又释放锁的异常;
  4. 释放锁一定要放在 finally 中,否则会导致线程阻塞

使用示例

1.独占锁:模拟抢票场景

场景:抢票。总共10个人,抢8张票
思考:理论上,肯定有两个人抢不到票对吧,那我们看看不加锁的代码结果会是如何
示例1:无锁版

public class ReentrantLockTest {

    // 票数
    public static int tickets = 8;

    // 总人数
    public static final int PERSONS = 10;

    public static void main(String[] args) {

        for (int i = 0; i < PERSONS; i++) {
            new Thread(() -> {

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                if(tickets > 0) {
                    System.out.println("我是" + Thread.currentThread().getName() + ",我来抢第【" + tickets-- + "】张票");
                } else {
                    System.out.println("我是" + Thread.currentThread().getName() + ",票卖完了我没抢到");
                }
            }).start();
        }

//        系统输出内容:
//        我是Thread-1,我来抢第【8】张票
//        我是Thread-8,我来抢第【4】张票
//        我是Thread-2,我来抢第【8】张票
//        我是Thread-0,我来抢第【3】张票
//        我是Thread-7,我来抢第【6】张票
//        我是Thread-3,我来抢第【7】张票
//        我是Thread-9,我来抢第【8】张票
//        我是Thread-6,我来抢第【8】张票
//        我是Thread-5,我来抢第【5】张票
//        我是Thread-4,我来抢第【8】张票
    }
}

大家看看最后的输出打印结果,第【8】张票同时卖给了4、6、9线程。这显然是不对的。我们这里加上ReentrantLock看看,结果会是怎样,代码示例如下:

public class ReentrantLockTest {

    // 票数
    public static int tickets = 8;

    // 总人数
    public static final int PERSONS = 10;

    public static final Lock LOCK = new ReentrantLock();


    public static void main(String[] args) {

        for (int i = 0; i < PERSONS; i++) {
            new Thread(() -> {
                buyTicket();
            }).start();
        }
    }

    public static void buyTicket() {
        // 获取锁
        LOCK.lock();
        try {
            Thread.sleep(1000);

            if(tickets > 0) {
                System.out.println("我是" + Thread.currentThread().getName() + ",我来抢第【" + tickets-- + "】张票");
            } else {
                System.out.println("我是" + Thread.currentThread().getName() + ",票卖完了我没抢到");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {

            // 释放说
            LOCK.unlock();
        }
    }

//    系统输出如下:
//    我是Thread-0,我来抢第【8】张票
//    我是Thread-1,我来抢第【7】张票
//    我是Thread-2,我来抢第【6】张票
//    我是Thread-3,我来抢第【5】张票
//    我是Thread-4,我来抢第【4】张票
//    我是Thread-5,我来抢第【3】张票
//    我是Thread-9,我来抢第【2】张票
//    我是Thread-7,我来抢第【1】张票
//    我是Thread-8,票卖完了我没抢到
//    我是Thread-6,票卖完了我没抢到
}

看最后的输出结果,销售正常了

2.公平锁和非公平锁

ReentantLock公平锁跟非公平锁的新建方式如下:

ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁  
ReentrantLock lock = new ReentrantLock(true); //公平锁  

那什么是公平锁,什么是非公平锁?

  • 公平锁:线程在获取锁时,按照等待的先后顺序获取锁;
  • 非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock默认是非公平锁

总的来说,非公平锁,就是在请求获取锁的瞬间,有一次能马上获得锁的机会(插队),获得成功直接拿锁,获取不成功就排队等候。这里就不写示例了

3.可重入锁

可重入锁又名递归锁,是指同一个线程在获得锁之后,那么它可以再前一次锁了之后,重复(继续)获取这把锁(不可重入的话,就算是相同的线程来获取,也会被挡在外面)。Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。在实际开发中,可重入锁常常应用于递归操作、调用同一个类中的其他方法、锁嵌套等场景中。

public class ReentrantLockTest {

    // 创建 ReentrantLock 对象
    private final ReentrantLock lock = new ReentrantLock();

    public void recursiveCall(int num) {

        // 获取锁
        lock.lock();
        try {
            if (num == 0) {
                return;
            }
            System.out.println("执行递归,num = " + num);

            // 自己调用自己
            recursiveCall(num - 1);
        } finally {

            // 释放锁
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        // 创建计数器对象
        ReentrantLockTest lockTest = new ReentrantLockTest();

        // 测试递归调用
        lockTest.recursiveCall(10);
    }
}
4.结合Condition实现生产者消费者模式

java.util.concurrent类库中提供Condition类来实现线程之间的协调。调用Condition.await() 方法使线程等待,其他线程调用Condition.signal() 或 Condition.signalAll() 方法唤醒等待的线程。
注意:调用Condition的await()和signal()方法,都必须在lock之内(同Object的wait跟notify要在synchronized一样)
案例:基于ReentrantLock和Condition实现一个简单队列

  • 场景:有一个库存容量为1的容器,提供了put()方法增加库存;take()方法减去库存。生产者先生产,库存满了之后,生产者停下,并且通知消费者来消费;消费者收到通知后,开始消费,消费完所有库存后,通知生产者接着生产
  • 思路:一个Condition代表一种状态,库存有两种状态【满】、【空】
public class ReentrantLockTest {
    private static StockManage stockManage = new StockManage(1);

    public static void main(String[] args) {

        // 启动【生产者】线程
        Producer producer = new Producer();
        producer.setName("生产者");
        producer.start();

        // 启动【消费者】线程
        Consumer consumer = new Consumer();
        consumer.setName("消费者");
        consumer.start();
    }

    /**
     * 消费者
     */
    public static class Consumer extends Thread {

        @Override
        public void run() {
            try {
                // 隔2秒轮询消费一次
                while (true) {
                    Thread.sleep(2000);
                    System.out.println("consumer消费:" + stockManage.take());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 生产者
     */
    public static class Producer extends Thread {

        @Override
        public void run() {
            try {
                // 隔1秒轮询生产一次
                while (true) {
                    Thread.sleep(1000);
                    stockManage.put(new Random().nextInt(1000));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class StockManage {
        private Object[] items;
        int size = 0;
        int takeIndex;
        int putIndex;
        private ReentrantLock lock;

        // 消费者线程阻塞唤醒条件,队列为空阻塞,生产者生产完唤醒
        public Condition notEmpty;

        // 生产者线程阻塞唤醒条件,队列满了阻塞,消费者消费完唤醒
        public Condition notFull;

        public StockManage(int capacity) {
            this.items = new Object[capacity];
            lock = new ReentrantLock();
            notEmpty = lock.newCondition();
            notFull = lock.newCondition();
        }

        /**
         * 【生产者】增加库存
         */
        public void put(Object value) throws Exception {

            // 加锁
            lock.lock();
            try {
                while (size == items.length) {

                    // 队列满了让生产者等待
                    notFull.await();
                }

                // 这里是一个典型环形数组思路
                items[putIndex] = value;
                if (++putIndex == items.length) {
                    putIndex = 0;
                }
                size++;

                // 生产完,使用【notEmpty】条件唤醒消费者
                notEmpty.signal();

            } finally {
                System.out.println("producer生产:" + value);

                //解锁
                lock.unlock();
            }
        }

        /**
         * 【消费者】拿库存
         */
        public Object take() throws Exception {
            lock.lock();
            try {

                while (size == 0) {

                    // 队列空了就让消费者等待
                    notEmpty.await();
                }

                Object value = items[takeIndex];
                items[takeIndex] = null;
                if (++takeIndex == items.length) {
                    takeIndex = 0;
                }
                size--;

                // 消费完,使用【notFull】条件来唤醒生产者继续生产
                notFull.signal();
                return value;
            } finally {
                lock.unlock();
            }
        }
    }
//    系统输出如下:
//    producer生产:190
//    consumer消费:190
//    producer生产:775
//    consumer消费:775
//    producer生产:188
//    consumer消费:188
//    剩下的打印省略....
}

*二、Semaphore(信号量)

基本介绍

Semaphore(信号量)是一种用于多线程编程的同步工具,用于控制同时访问某个资源的线程数量。它通过维护若干个许可证来控制线程对资源的访问。当许可证数量大于零时,线程可以访问;反之,则拒绝,并阻塞线程让其等待。许可证的数量,就是最多可访问线程数。

典型应用

  • 各种连接池
  • 服务限流

常用API

构造方法:
在这里插入图片描述

permits 表示许可证的数量(资源数)
fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

获取、释放资源:

acquire() 表示阻塞并获取许可
tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
release() 表示释放许可

使用示例

  • 场景:某快家庭幸福型企业为了给予员工幸福体验,决定公司餐厅不再提供餐具,转而让员工自带餐具,并且要求自己吃完饭后洗碗。由于地方有限,修的洗碗池只能同时容纳5人洗碗。但是公司有100+人吃完饭后在等待洗碗。
  • 100+人竞争,但是可供共享的资源只有5个

代码1【限流】:(幸福家庭型企业的洗碗池)

public class SemaphoreTest {


    // 【洗碗池】数量
    private static Semaphore washPool = new Semaphore(5);

    // 【员工】数量
    private static final int EMP_COUNT = 108;

    public static void main(String[] args) {
        for (int i = 0; i < EMP_COUNT; i++) {
            Thread thread = new Thread(() -> {
                try {
                    washPool.acquire();
                    System.out.println("我是【" + Thread.currentThread().getName() + "】,我占到洗碗池了");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("我是【" + Thread.currentThread().getName() + "】,我洗完啦,溜了溜了");
                    washPool.release();
                }
            }, "员工_" + i);
            thread.start();
        }
    }
//    系统输出如下:
//    我是【员工_2】,我占到洗碗池了
//    我是【员工_3】,我占到洗碗池了
//    我是【员工_0】,我占到洗碗池了
//    我是【员工_1】,我占到洗碗池了
//    我是【员工_4】,我占到洗碗池了
//    我是【员工_1】,我洗完啦,溜了溜了
//    我是【员工_4】,我洗完啦,溜了溜了
//    我是【员工_0】,我洗完啦,溜了溜了
//    我是【员工_3】,我洗完啦,溜了溜了
//    我是【员工_2】,我洗完啦,溜了溜了
//    省略剩下的打印.................
}

示例2【资源池】:(模拟数据库连接池的简单示例)

package org.tuling.juc.sema;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @author zhanghuitong
 * @Date 2023/7/19 9:01
 * @slogan 编码即学习
 **/
public class SimulateThreadPoolTest {
    public static void main(String[] args) {

        // 初始化连接池,并且设置连接池数为:2
        final ConnectPool connectPool = new ConnectPool(2);
        ExecutorService executorService = Executors.newCachedThreadPool();

        // 模拟5个线程来并发争抢连接资源
        final int requireConnectCount = 5;
        for (int i = 0; i < requireConnectCount; i++) {
            final String name = "线程_" + i;
            new Thread(()->{
                Connect connect = null;
                try {
                    System.out.println("线程" + Thread.currentThread().getName() + "申请获取数据库连接池");
                    connect = connectPool.openConnect();
                    System.out.println("线程" + Thread.currentThread().getName() + "成功拿到数据库连接" + connect);

                    // 这里大概会做一些数据库操作,接着释放
                    Thread.sleep(2000);
                    System.out.println("线程" + Thread.currentThread().getName() + "释放数据库连接" + connect);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    connectPool.releaseConnect(connect);
                }
            }, name).start();
        }
    }
}

/**
 * 连接池,管理连接的地方
 */
class ConnectPool {

    /**
     * 连接池大小,即最大可同时连接数量
     */
    private int size;
    private Connect[] connects;

    /**
     * 记录对应下标的Connect是否已被使用
     */
    private boolean[] connectFlag;

    /**
     * 信号量对象
     */
    private Semaphore semaphore;

    /**
     * size:初始化连接池大小
     */
    public ConnectPool(int size) {
        this.size = size;
        semaphore = new Semaphore(size, true);
        connects = new Connect[size];
        connectFlag = new boolean[size];
        initConnects();
    }

    private void initConnects() {
        for (int i = 0; i < this.size; i++) {
            connects[i] = new Connect(i);
        }
    }

    /**
     * 获取数据库连接
     */
    public Connect openConnect() throws InterruptedException {
        semaphore.acquire();
        return doGetConnect();
    }

    /**
     * 获取连接
     */
    private synchronized Connect doGetConnect() {
        for (int i = 0; i < connectFlag.length; i++) {

            if (!connectFlag[i]) {
                
                // 标记该连接已被使用
                connectFlag[i] = true;
                return connects[i];
            }
        }

        return null;
    }

    /**
     * 释放连接
     */
    public synchronized void releaseConnect(Connect connect) {
        if (connect == null) {
            return;
        }

        for (int i = 0; i < this.size; i++) {
            if (connect == connects[i]) {
                connectFlag[i] = false;
                semaphore.release();
            }
        }
    }
}

/**
 * 连接池中的连接对象
 */
class Connect {

    /**
     * 下面这两个变量只是用来生成id的
     * 代表这个链接
     */
    private int id;

    public Connect(int id) {

        this.id = id;
        // 我们知道打开一个连接很耗费资源的,需要等待1秒钟
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("连接#" + id + "#已与数据库建立通道!");
    }

    @Override
    public String toString() {
        return "Connect{" +
                "id=" + id +
                '}';
    }
}

// 系统输出如下:
// 连接#0#已与数据库建立通道!
// 连接#1#已与数据库建立通道!
// 线程线程_0申请获取数据库连接池
// 线程线程_4申请获取数据库连接池
// 线程线程_1申请获取数据库连接池
// 线程线程_4成功拿到数据库连接Connect{id=1}
// 线程线程_3申请获取数据库连接池
// 线程线程_0成功拿到数据库连接Connect{id=0}
// 线程线程_2申请获取数据库连接池
// 线程线程_4释放数据库连接Connect{id=1}
// 线程线程_0释放数据库连接Connect{id=0}
// 线程线程_1成功拿到数据库连接Connect{id=0}
// 线程线程_3成功拿到数据库连接Connect{id=1}
// 线程线程_3释放数据库连接Connect{id=1}
// 线程线程_1释放数据库连接Connect{id=0}
// 线程线程_2成功拿到数据库连接Connect{id=0}
// 线程线程_2释放数据库连接Connect{id=0}

使用总结

以下是一些使用Semaphore的常见场景:

  1. 限流:Semaphore可以用于限制对共享资源的并发访问数量,以控制系统的流量。
  2. 资源池:Semaphore可以用于实现资源池,以维护一组有限的共享资源。

*三、CountDownLatch(闭锁)

基本介绍

CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。CountDownLatch内部维护了一个计数器,初始值为线程的数量。每有一个线程完成了任务,计数器的值减1,当计数器值为0时,代表所有线程已经完成了任务。
在这里插入图片描述

典型应用

LOL游戏载入中的时候,需要等到10个人都100%加载成功了,才可以进入游戏

常用API

构造方法:
在这里插入图片描述

count: 初始化要等待的线程数量,即:计数器数量

获取、释放资源:

await(); // 使当前线程进入同步队列进行等待,直到latch的值被减到0或者当前线程被中断,当前线程就会被唤醒
await(long timeout, TimeUnit unit); // 带超时时间的await()
countDown(); // 使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程
getCount(); // 获得latch的数值

使用示例

示例1:(LOL/王者游戏加载,m个线程等待n个线程完成之后继续执行)

  • 场景:所有玩家选择完英雄后,开始进入游戏加载页面。所有玩家加载完后,进入游戏,游戏聊天广播线程也开始载入
  • 思路:英雄联盟加载游戏可以视为1个主线程,10个玩家为加载子线程。只有10个玩家都加载至100%后,主线程才能继续运行游戏,然后聊天广播线程也开始运行。所以
public class LOLLoadingTest {

    /**
     * LOL游戏总人数
     */
    private static final int TOTAL_PLAYERS = 10;

    /**
     * 游戏主线程等待玩家加载线程
     */
    private static CountDownLatch gameWaitAllPlayerLoaded = new CountDownLatch(TOTAL_PLAYERS);

    public static void main(String[] args) throws InterruptedException {

        System.out.println("所有玩家已经选择完毕,游戏载入中");
        LOLChatBroadcast lolChatBroadcast = new LOLChatBroadcast();
        lolChatBroadcast.start();
        for (int i = 0; i < TOTAL_PLAYERS; i++) {
            final String name = "玩家_" + (i + 1);
            LOLPlayer lolPlayer = new LOLPlayer();
            lolPlayer.setName(name);
            lolPlayer.start();
        }

        gameWaitAllPlayerLoaded.await();
        System.out.println("所有玩家都已载入成功");
        System.out.println("欢迎来到英雄联盟!!!!!");
    }

    /**
     * 游戏聊天广播线程
     */
    public static class LOLChatBroadcast extends Thread {

        @Override
        public void run() {
            System.out.println("我是聊天广播线程,等待游戏载入");
            try {
                gameWaitAllPlayerLoaded.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("游戏载入完毕,开始聊天广播");
            System.out.println("请大家文明发言,建设文明游戏");
        }
    }

	/**
     * 游戏玩家加载线程
     */
    public static class LOLPlayer extends Thread {

        @Override
        public void run() {
            System.out.println("我是【" + Thread.currentThread().getName() + "】,已加载0%");

            // 睡个几秒钟
            try {
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("我是【" + Thread.currentThread().getName() + "】,已加载100%");
            gameWaitAllPlayerLoaded.countDown();
        }
    }
//    系统输出:
//    所有玩家已经选择完毕,游戏载入中
//    我是聊天广播线程,等待游戏载入
//    我是【玩家_1】,已加载0%
//    我是【玩家_2】,已加载0%
//    我是【玩家_3】,已加载0%
//    我是【玩家_4】,已加载0%
//    我是【玩家_5】,已加载0%
//    我是【玩家_6】,已加载0%
//    我是【玩家_7】,已加载0%
//    我是【玩家_8】,已加载0%
//    我是【玩家_9】,已加载0%
//    我是【玩家_10】,已加载0%
//    我是【玩家_8】,已加载100%
//    我是【玩家_3】,已加载100%
//    我是【玩家_7】,已加载100%
//    我是【玩家_1】,已加载100%
//    我是【玩家_5】,已加载100%
//    我是【玩家_4】,已加载100%
//    我是【玩家_9】,已加载100%
//    我是【玩家_2】,已加载100%
//    我是【玩家_6】,已加载100%
//    我是【玩家_10】,已加载100%
//    游戏载入完毕,开始聊天广播
//    请大家文明发言,建设文明游戏
//    所有玩家都已载入成功
//    欢迎来到英雄联盟!!!!!
}

示例2:(百米赛跑)

  • 场景:有8名选手参加百米赛跑,8名参赛选手,需要等待哨子声响,才可以起跑;裁判也只能等8名选手都到达终点之后才能宣布比赛结束
  • 思路:8名运动员需要等1个哨声响起,才能起跑,这是第1个计数器(m个线程等待1个线程执行完才能继续执行);裁判需要等8名运动员跑完,才能结束比赛,宣布结果(1个线程等待m个线程执行完才能继续执行)
public class CountDownLatchTest {

    /**
     * 运动员等待裁判哨声响起,哨子只会响一次
     */
    private static CountDownLatch athleteWaitWhistleBlows = new CountDownLatch(1);

    /**
     * 裁判等待8名远动员跑完的计数器
     */
    private static CountDownLatch wait8AthletesArrival = new CountDownLatch(8);


    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 8; i++) {
            final String name = "运动员_" + i;
            new Thread(() -> {

                System.out.println("我是运动员【" + Thread.currentThread().getName() + "】,我已经做好准备了");
                try {
                    athleteWaitWhistleBlows.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("运动员【" + Thread.currentThread().getName() + "】冲出去了!!!");
                try {
                    int i1 = new Random().nextInt(1000);
                    Thread.sleep(1000 + i1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("运动员【" + Thread.currentThread().getName() + "】到达了终点!!!");
                wait8AthletesArrival.countDown();
            }, name).start();
        }

        Thread.sleep(1000);
        System.out.println("运动员已经全部入场,裁判准备吹哨");
        Thread.sleep(3000);
        System.out.println("吹哨:哔哔哔");
        athleteWaitWhistleBlows.countDown();
        System.out.println("裁判在观察,等待所有运动远跑完");
        wait8AthletesArrival.await();
        System.out.println("比赛结束");
    }

//    系统输出:
//    我是运动员【运动员_0】,我已经做好准备了
//    我是运动员【运动员_3】,我已经做好准备了
//    我是运动员【运动员_2】,我已经做好准备了
//    我是运动员【运动员_1】,我已经做好准备了
//    我是运动员【运动员_5】,我已经做好准备了
//    我是运动员【运动员_4】,我已经做好准备了
//    我是运动员【运动员_6】,我已经做好准备了
//    我是运动员【运动员_7】,我已经做好准备了
//    运动员已经全部入场,裁判准备吹哨
//    吹哨:哔哔哔
//    裁判在观察,等待所有运动远跑完
//    运动员【运动员_3】冲出去了!!!
//    运动员【运动员_2】冲出去了!!!
//    运动员【运动员_7】冲出去了!!!
//    运动员【运动员_0】冲出去了!!!
//    运动员【运动员_6】冲出去了!!!
//    运动员【运动员_4】冲出去了!!!
//    运动员【运动员_5】冲出去了!!!
//    运动员【运动员_1】冲出去了!!!
//    运动员【运动员_6】到达了终点!!!
//    运动员【运动员_4】到达了终点!!!
//    运动员【运动员_0】到达了终点!!!
//    运动员【运动员_7】到达了终点!!!
//    运动员【运动员_3】到达了终点!!!
//    运动员【运动员_5】到达了终点!!!
//    运动员【运动员_2】到达了终点!!!
//    运动员【运动员_1】到达了终点!!!
//    比赛结束
}

示例3:(多任务完成后合并汇总)
场景:很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据check。

public static void main(String[] args) throws Exception {

        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000));
                    System.out.println("任务" + index +"执行完成");
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // 主线程在阻塞,当计数器为0,就唤醒主线程往下执行
        countDownLatch.await();
        System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
    }
    
//    系统输出:
//    任务4执行完成 
//    任务2执行完成
//    任务1执行完成
//    任务0执行完成
//    任务3执行完成
//    主线程:在所有任务运行完成后,进行结果汇总

使用总结

以下是使用CountDownLatch的常见场景:

  1. 并行任务同步:CountDownLatch可以用于协调多个并行任务的完成情况,确保所有任务都完成后再继续执行下一步操作。
  2. 多任务汇总:CountDownLatch可以用于统计多个线程的完成情况,以确定所有线程都已完成工作。
  3. 资源初始化:CountDownLatch可以用于等待资源的初始化完成,以便在资源初始化完成后开始使用

四、CyclicBarrier(循环屏障)

基本介绍

CyclicBarrier(循环屏障),是 Java 并发库中的一个同步工具,它跟上面说到的CountDownLatch很相似,都可以使一批线程在等待之后执行。根据官方的释义,CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。另外,CyclicBarrier侧重于可以重复使用的栅栏,不像是CountDownLatch,是一次性的。

常用API

构造器:

public CyclicBarrier(int parties); // parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

public CyclicBarrier(int parties, Runnable barrierAction); // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)

在这里插入图片描述
普通方法:

// 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException;
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;

public void reset(); // 循环 通过reset()方法可以进行重置

使用示例

示例1:(模拟人满发车)

  • 场景:这里有一辆车,司机很有个性,我人满才发车,人不满,我是不会动的
  • 思路:人满才发车,发车的动作是一致的,但是人不一定是之前的人(重置发车这个操作)
public class CyclicBarrierDemo {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(5);

         // 人满之后的操作
        Runnable startTheCarWhileCarIsFull = new Runnable() {
            @Override
            public void run() {
                System.out.println("人齐了是吧,那我开车啦");
            }
        };

        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, startTheCarWhileCarIsFull);

        for (int i = 0; i < 10; i++) {
            final int id = i + 1;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(id + "号马上就到");
                        int sleepMills = ThreadLocalRandom.current().nextInt(2000);
                        Thread.sleep(sleepMills);
                        System.out.println(id + "号到了,上车");
                        cyclicBarrier.await();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

//    系统输出:
//    1号马上就到
//    5号马上就到
//    4号马上就到
//    3号马上就到
//    2号马上就到
//    4号到了,上车
//    1号到了,上车
//    5号到了,上车
//    2号到了,上车
//    3号到了,上车
//    人齐了,准备发车
//    6号马上就到
//    9号马上就到
//    7号马上就到
//    8号马上就到
//    10号马上就到
//    7号到了,上车
//    6号到了,上车
//    10号到了,上车
//    8号到了,上车
//    9号到了,上车
//    人齐了,准备发车
}

示例2:(多线程批量处理数据)

public class CyclicBarrierBatchProcessorDemo {

    public static void main(String[] args) {
        //生成数据
        List<Integer> data = new ArrayList<>();
        for (int i = 1; i <= 50; i++) {
            data.add(i);
        }

        //指定数据处理大小
        int batchSize = 5;
        CyclicBarrierBatchProcessor processor = new CyclicBarrierBatchProcessor(data, batchSize);
        //处理数据
        processor.process(batchData -> {
            for (Integer i : batchData) {
                System.out.println(Thread.currentThread().getName() + "处理数据" + i);
            }
        });
    }
}


class CyclicBarrierBatchProcessor {
    private List<Integer> data;
    private int batchSize;
    private CyclicBarrier barrier;
    private List<Thread> threads;

    public CyclicBarrierBatchProcessor(List<Integer> data, int batchSize) {
        this.data = data;
        this.batchSize = batchSize;
        this.barrier = new CyclicBarrier(batchSize);
        this.threads = new ArrayList<>();
    }

    public void process(BatchTask task) {
        // 对任务分批,获取线程数
        int threadCount = (data.size() + batchSize - 1) / batchSize;
        for (int i = 0; i < threadCount; i++) {
            int start = i * batchSize;
            int end = Math.min(start + batchSize, data.size());
            //获取每个线程处理的任务数
            List<Integer> batchData = data.subList(start, end);
            Thread thread = new Thread(() -> {
                task.process(batchData);
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
            threads.add(thread);
            thread.start();
        }

    }

    public interface BatchTask {
        void process(List<Integer> batchData);
    }
//    系统输出:
//    Thread-1处理数据6
//    Thread-5处理数据26
//    Thread-4处理数据21
//    Thread-4处理数据22
//    Thread-4处理数据23
//    Thread-7处理数据36
//    Thread-7处理数据37
//    Thread-2处理数据11
//    Thread-3处理数据16
//    Thread-3处理数据17
//    Thread-3处理数据18
//    Thread-3处理数据19
//    Thread-3处理数据20
//    Thread-0处理数据1
//    Thread-0处理数据2
//    Thread-0处理数据3
//    Thread-0处理数据4
//    Thread-0处理数据5
//    Thread-2处理数据12
//    Thread-2处理数据13
//    Thread-7处理数据38
//    Thread-7处理数据39
//    Thread-7处理数据40
//    Thread-4处理数据24
//    Thread-4处理数据25
//    Thread-9处理数据46
//    Thread-9处理数据47
//    Thread-8处理数据41
//    Thread-8处理数据42
//    Thread-8处理数据43
//    Thread-8处理数据44
//    Thread-8处理数据45
//    Thread-5处理数据27
//    Thread-5处理数据28
//    Thread-5处理数据29
//    Thread-5处理数据30
//    Thread-6处理数据31
//    Thread-6处理数据32
//    Thread-6处理数据33
//    Thread-1处理数据7
//    Thread-1处理数据8
//    Thread-6处理数据34
//    Thread-6处理数据35
//    Thread-9处理数据48
//    Thread-2处理数据14
//    Thread-9处理数据49
//    Thread-1处理数据9
//    Thread-9处理数据50
//    Thread-2处理数据15
//    Thread-1处理数据10
}

使用场景

以下是一些常见的 CyclicBarrier 应用场景:

  1. 多线程任务:CyclicBarrier 可以用于将复杂的任务分配给多个线程执行,并在所有线程完成工作后触发后续操作。
  2. 数据处理:CyclicBarrier 可以用于协调多个线程间的数据处理,在所有线程处理完数据后触发后续操作。

五、Exchanger(数据交换器)

基本介绍

Exchanger是一个用于线程间协作的工具类,用于两个线程间交换数据。具体交换数据是通过exchange方法来实现的,如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。
在这里插入图片描述

常用API

public V exchange(V x) throws InterruptedException; // 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象

public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException; // 等待另一个线程到达此交换点,或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常,然后将给定的对象传送给该线程,并接收该线程的对象。

使用示例

示例1:(一手交钱一手交货)

  • 场景:我有一台闲置的电脑,刚好有个人想买,于是我们约在了瑞幸咖啡店相约交易
  • 思路:卖家拿上电脑,去约好的瑞幸咖啡店;买家也是。 这里就有一个交易地点:瑞幸咖啡店
public class ExchangerDemo {

    /**
     * 交易地点:瑞幸咖啡店
     */
    private static Exchanger luckinCoffeeShop = new Exchanger();
    static String goods = "电脑";
    static String money = "¥3000";
    public static void main(String[] args) throws InterruptedException {

        System.out.println("准备交易,一手交钱一手交货...");
        // 卖家
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("卖家到瑞幸咖啡店了,已经准备好货:" + goods);
                try {
                    String money = (String) luckinCoffeeShop.exchange(goods);
                    System.out.println("卖家收到钱:" + money);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        Thread.sleep(3000);

        // 买家
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("买家到瑞幸咖啡店了,已经准备好钱:" + money);
                    String goods = (String) luckinCoffeeShop.exchange(money);
                    System.out.println("买家收到货:" + goods);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
//    系统输出:
//    准备交易,一手交钱一手交货...
//    卖家到瑞幸咖啡店了,已经准备好货:电脑
//    买家到瑞幸咖啡店了,已经准备好钱:¥3000
//    买家收到货:电脑
//    卖家收到钱:¥3000
}

示例2:(模拟对账场景)

public class ExchangerDemo2 {

    private static final Exchanger<String> exchanger = new Exchanger();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "12379871924sfkhfksdhfks";
                    exchanger.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "32423423jknjkfsbfj";
                    String A = exchanger.exchange(B);
                    System.out.println("A和B数据是否一致:" + A.equals(B));
                    System.out.println("A= "+A);
                    System.out.println("B= "+B);
                } catch (InterruptedException e) {
                }
            }
        });

        threadPool.shutdown();

    }

//    系统输出:
//    A和B数据是否一致:false
//    A= 12379871924sfkhfksdhfks
//    B= 32423423jknjkfsbfj
}

从上面两个例子可以看出,使用上并不困难,重点是要确认交换器必须是同一个。

使用总结

Exchanger 可以用于各种应用场景,具体取决于具体的 Exchanger 实现。常见的场景包括:

  1. 数据交换:在多线程环境中,两个线程可以通过 Exchanger 进行数据交换。
  2. 数据采集:在数据采集系统中,可以使用 Exchanger 在采集线程和处理线程间进行数据交换。

六、Phaser(阶段协同器)

基本介绍

Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行。Phaser可以被视为CyclicBarrier和CountDownLatch的进化版。
对Phaser阶段协同器的理解,Phaser适用于多个线程协作的任务,分为多个阶段,每个阶段都可以有任意个参与者,线程可以随时注册并参与某个阶段;当一个阶段中所有任务都成功完成后,Phaser的onAdvance()被调用,然后Phaser释放等待线程,自动进入下个阶段。如此循环,直到Phaser不再包含任何参与者。
它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行。,它能够自适应地调整并发线程数,可以动态地增加或减少参与线程的数量。所以Phaser特别适合使用在重复执行或者重用的情况。
在这里插入图片描述
在这里插入图片描述
上面的两张图其实表达的意思是一样的,那就是表现出了多屏障,并且线程数量在不同阶段的动态修改性。另外,相对于CyclicBarrier和CountDownLatch,它还具有如下特性:

特性一:CountDownLatch、CyclicBarrier只适用于固定数量的参与者,而Phaser适用于可变数目的屏障
特性二:Phaser可能是分层的,这允许你以树形结构来安排移相器Phaser以减少竞争。
特性三:Phaser使用独立的对象可以监视Phaser的当前状态,监视器可以查询注册到Phaser的参与者的数量,以及已经到达和还没有到达某个特定相数的参与者的数量。

常用API

构造方法:

Phaser(); // 参与任务数0
Phaser(int parties); // 指定初始参与任务数,即线程数
Phaser(Phaser parent); // 指定parent阶段器, 子对象作为一个整体加入parent对象, 当子对象中没有参与者时,会自动从parent对象解除注册
Phaser(Phaser parent,int parties); // 集合上面两个方法

增、减参与任务数方法:

int register(); // 增加一个任务数,返回当前阶段号。
int bulkRegister(int parties); // 增加指定任务个数,返回当前阶段号。
int arriveAndDeregister(); // 减少一个任务数,返回当前阶段号。

到达、等待方法:

int arrive(); // 到达(任务完成),返回当前阶段号。
int arriveAndAwaitAdvance(); // 到达后等待其他任务到达,返回到达阶段号。
int awaitAdvance(int phase); // 在指定阶段等待(必须是当前阶段才有效)
int awaitAdvanceInterruptibly(int phase); // 阶段到达触发动作
int awaitAdvanceInterruptiBly(int phase,long timeout,TimeUnit unit)
protected boolean onAdvance(int phase,int registeredParties); // 类似CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作,该方法返回true将终结Phaser对象。

注意,上面说的parties或者任务数,就是指参加的线程数

使用示例

示例1:(代替CountDownLatch,实现LOL加载游戏)
代码示例如下:

public class PhaserLOLTest {

    /**
     * LOL游戏总人数
     */
    private static final int TOTAL_PLAYERS = 10;

    /**
     * 游戏主线程等待玩家加载线程
     */
    private static Phaser gameWaitAllPlayerLoaded = new Phaser(TOTAL_PLAYERS);

    public static void main(String[] args) throws InterruptedException {

        System.out.println("所有玩家已经选择完毕,游戏载入中");
        LOLChatBroadcast lolChatBroadcast = new LOLChatBroadcast();
        lolChatBroadcast.start();
        for (int i = 0; i < TOTAL_PLAYERS; i++) {
            final String name = "玩家_" + (i + 1);
            LOLPlayer lolPlayer = new LOLPlayer();
            lolPlayer.setName(name);
            lolPlayer.start();
        }

        gameWaitAllPlayerLoaded.awaitAdvance(gameWaitAllPlayerLoaded.getPhase());
        System.out.println("所有玩家都已载入成功");
        System.out.println("欢迎来到英雄联盟!!!!!");
    }

    /**
     * 游戏聊天广播线程
     */
    public static class LOLChatBroadcast extends Thread {

        @Override
        public void run() {
            System.out.println("我是聊天广播线程,等待游戏载入");
            gameWaitAllPlayerLoaded.awaitAdvance(gameWaitAllPlayerLoaded.getPhase());
            System.out.println("游戏载入完毕,开始聊天广播");
            System.out.println("请大家文明发言,建设文明游戏");
        }
    }

    /**
     * 游戏玩家加载线程
     */
    public static class LOLPlayer extends Thread {

        @Override
        public void run() {
            System.out.println("我是【" + Thread.currentThread().getName() + "】,已加载0%");

            // 睡个几秒钟
            try {
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("我是【" + Thread.currentThread().getName() + "】,已加载100%");
            gameWaitAllPlayerLoaded.arrive();
        }
    }
}

示例2:(用Phaser模拟百米赛跑)

  • 场景:有8名选手参加百米赛跑。本次赛跑分为3个阶段。阶段1运动员热完身后在起跑线等待裁判号令枪一响就冲出去;阶段2运动员开始跑步;阶段3运动员们等待裁判宣布结果退场
  • 思路:因为有8名远动员参加,所以需要设置Phaser的初始化parties为8;然后有3个阶段,所以运动员会使用3次Phaser的等待方法
public class PhaserRunningRaceTest {

    private static Phaser runningRacePhaser = new Phaser(8);

    public static void main(String[] args) {
        for (int i = 0; i < 8; i++) {
            final String name = "运动员_" + i;
            Athlet athlet = new Athlet();
            athlet.setName(name);
            athlet.start();
        }
    }

    public static class Athlet extends Thread {

        @Override
        public void run() {

            // 阶段1:等待裁判吹哨出发
            System.out.println("【" + Thread.currentThread().getName() + "】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【" + runningRacePhaser.getPhase() + "】阶段");
            runningRacePhaser.arriveAndAwaitAdvance();

            // 阶段2:开炮
            System.out.println("【" + Thread.currentThread().getName() + "】听到枪声响了,冲啊!!!!!这是第【" + runningRacePhaser.getPhase() + "】阶段");
            int runningTime = new Random().nextInt(5);
            try {
                TimeUnit.SECONDS.sleep(runningTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("【" + Thread.currentThread().getName() + "】冲过了终点。这是第【" + runningRacePhaser.getPhase() + "】阶段");
            runningRacePhaser.arriveAndAwaitAdvance();

            // 阶段3:冲过终点,等待裁判宣布比赛结束,退场
            System.out.println("裁判宣布比赛结束了,退场,溜了溜了。这是第【" + runningRacePhaser.getPhase() + "】阶段");
        }
    }

//    系统输出:
//    【运动员_0】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_2】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_1】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_3】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_4】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_5】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_6】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_7】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_3】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_2】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_7】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_6】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_5】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_0】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_1】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_4】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_6】冲过了终点。这是第【1】阶段
//    【运动员_2】冲过了终点。这是第【1】阶段
//    【运动员_0】冲过了终点。这是第【1】阶段
//    【运动员_1】冲过了终点。这是第【1】阶段
//    【运动员_5】冲过了终点。这是第【1】阶段
//    【运动员_3】冲过了终点。这是第【1】阶段
//    【运动员_4】冲过了终点。这是第【1】阶段
//    【运动员_7】冲过了终点。这是第【1】阶段
//    裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
}

示例3:(模拟百米赛跑,但是运动员有几率受伤退场。动态修改任务数)
代码示例如下:

public class PhaserRunningRaceTest {

    private static Phaser runningRacePhaser = new Phaser(8);

    public static void main(String[] args) {
        for (int i = 0; i < 8; i++) {
            final String name = "运动员_" + i;
            Athlet athlet = new Athlet();
            athlet.setName(name);
            athlet.start();
        }
    }

    public static class Athlet extends Thread {

        @Override
        public void run() {

            // 阶段1:等待裁判吹哨出发
            System.out.println("【" + Thread.currentThread().getName() + "】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【" + runningRacePhaser.getPhase() + "】阶段");
            runningRacePhaser.arriveAndAwaitAdvance();

            // 阶段2:开炮
            System.out.println("【" + Thread.currentThread().getName() + "】听到枪声响了,冲啊!!!!!这是第【" + runningRacePhaser.getPhase() + "】阶段");
            int runningTime = new Random().nextInt(5);
            try {
                if (runningTime <= 2) {
                    TimeUnit.SECONDS.sleep(runningTime);
                    runningRacePhaser.arriveAndDeregister();
                    System.out.println("【" + Thread.currentThread().getName() + "】摔了个狗吃屎只能退场了!!!这是第【" + runningRacePhaser.getPhase() + "】阶段");
                    return;
                } else {
                    TimeUnit.SECONDS.sleep(runningTime);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("【" + Thread.currentThread().getName() + "】冲过了终点。这是第【" + runningRacePhaser.getPhase() + "】阶段");
            runningRacePhaser.arriveAndAwaitAdvance();

            // 阶段3:冲过终点,等待裁判宣布比赛结束,退场
            System.out.println("【" + Thread.currentThread().getName() + "】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【" + runningRacePhaser.getPhase() + "】阶段");
        }
    }

//    系统输出:
//    【运动员_0】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_4】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_3】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_2】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_7】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_1】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_6】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_5】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
//    【运动员_5】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_4】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_0】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_7】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_6】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_1】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_2】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_3】听到枪声响了,冲啊!!!!!这是第【1】阶段
//    【运动员_2】摔了个狗吃屎只能退场了!!!这是第【1】阶段
//    【运动员_6】摔了个狗吃屎只能退场了!!!这是第【1】阶段
//    【运动员_7】冲过了终点。这是第【1】阶段
//    【运动员_4】冲过了终点。这是第【1】阶段
//    【运动员_1】冲过了终点。这是第【1】阶段
//    【运动员_0】冲过了终点。这是第【1】阶段
//    【运动员_5】冲过了终点。这是第【1】阶段
//    【运动员_3】冲过了终点。这是第【1】阶段
//    【运动员_0】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    【运动员_4】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    【运动员_3】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    【运动员_7】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    【运动员_1】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
//    【运动员_5】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
}

示例4:(比较复杂的版本,公司团建。阶段性任务;动态修改参与任务数parities)

  • 场景:某公司举行了一个小团建,分为4个阶段。阶段1来公司集合;阶段2出发去公园;阶段3去餐厅;阶段4吃饭。由于有人不爱运动,所以,本次活动全程参与的人只有3人;有2个员工今晚有约了,不吃饭,所以只参加前面两个阶段;另外有4个人,不喜欢运动,所以只参加后面的2个阶段
  • 思路:每个阶段参与人数不一样,所以不初始化parites了,而是动态添加。注册3个任务,执行全部阶段的;注册2个,只参加前面2个阶段的;注册4个,只参加后面2个阶段的
package org.example.lock;

import java.util.Random;
import java.util.concurrent.Phaser;

public class PhaserDemo {
    public static void main(String[] args) {

        final Phaser phaser = new Phaser() {
            //重写该方法来增加阶段到达动作
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                // 参与者数量,去除主线程
                int staffs = registeredParties - 1;
                switch (phase) {
                    case 0:
                        System.out.println("大家都到公司了,出发去公园,人数:" + staffs);
                        break;
                    case 1:
                        System.out.println("大家都到公园门口了,出发去餐厅,人数:" + staffs);
                        break;
                    case 2:
                        System.out.println("大家都到餐厅了,开始用餐,人数:" + staffs);
                        break;

                }

                // 判断是否只剩下主线程(一个参与者),如果是,则返回true,代表终止
                return registeredParties == 1;
            }
        };

        // 注册主线程 ———— 让主线程全程参与
        phaser.register();
        final StaffTask staffTask = new StaffTask();

        // 3个全程参与团建的员工
        for (int i = 0; i < 3; i++) {
            // 添加任务数
            phaser.register();
            new Thread(() -> {
                try {
                    staffTask.step1Task();
                    //到达后等待其他任务到达
                    phaser.arriveAndAwaitAdvance();

                    staffTask.step2Task();
                    phaser.arriveAndAwaitAdvance();

                    staffTask.step3Task();
                    phaser.arriveAndAwaitAdvance();

                    staffTask.step4Task();
                    // 完成了,注销离开
                    phaser.arriveAndDeregister();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // 两个不聚餐的员工加入
        for (int i = 0; i < 2; i++) {
            phaser.register();
            new Thread(() -> {
                try {
                    staffTask.step1Task();
                    phaser.arriveAndAwaitAdvance();

                    staffTask.step2Task();
                    System.out.println("员工【" + Thread.currentThread().getName() + "】回家了");
                    // 完成了,注销离开
                    phaser.arriveAndDeregister();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        while (!phaser.isTerminated()) {
            int phase = phaser.arriveAndAwaitAdvance();
            if (phase == 2) {
                // 到了去餐厅的阶段,又新增4人,参加晚上的聚餐
                for (int i = 0; i < 4; i++) {
                    phaser.register();
                    new Thread(() -> {
                        try {
                            staffTask.step3Task();
                            phaser.arriveAndAwaitAdvance();

                            staffTask.step4Task();
                            // 完成了,注销离开
                            phaser.arriveAndDeregister();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }).start();
                }
            }
        }
    }

    static final Random random = new Random();

    static class StaffTask {
        public void step1Task() throws InterruptedException {
            // 第一阶段:来公司集合
            String staff = "员工【" + Thread.currentThread().getName() + "】";
            System.out.println(staff + "从家出发了……");
            Thread.sleep(random.nextInt(5000));
            System.out.println(staff + "到达公司");
        }

        public void step2Task() throws InterruptedException {
            // 第二阶段:出发去公园
            String staff = "员工【" + Thread.currentThread().getName() + "】";
            System.out.println(staff + "出发去公园玩");
            Thread.sleep(random.nextInt(5000));
            System.out.println(staff + "到达公园门口集合");

        }

        public void step3Task() throws InterruptedException {
            // 第三阶段:去餐厅
            String staff = "员工【" + Thread.currentThread().getName() + "】";
            System.out.println(staff + "出发去餐厅");
            Thread.sleep(random.nextInt(5000));
            System.out.println(staff + "到达餐厅");

        }

        public void step4Task() throws InterruptedException {
            // 第四阶段:就餐
            String staff = "员工【" + Thread.currentThread().getName() + "】";
            System.out.println(staff + "开始用餐");
            Thread.sleep(random.nextInt(5000));
            System.out.println(staff + "用餐结束,回家");
        }
    }
}

使用总结

以下是一些常见的 Phaser 应用场景:

  1. 多线程任务分配:Phaser 可以用于将复杂的任务分配给多个线程执行,并协调线程间的合作。
  2. 多级任务流程:Phaser 可以用于实现多级任务流程,在每一级任务完成后触发下一级任务的开始。
  3. 模拟并行计算:Phaser 可以用于模拟并行计算,协调多个线程间的工作。
  4. 阶段性任务:Phaser 可以用于实现阶段性任务,在每一阶段任务完成后触发下一阶段任务的开始。

学习总结

  1. 学习了ReentrantLock及其特性,还有它与synchronized的区别。如:前者可以切换公平锁,可以设置超时时间,另外还提供尝试获取锁的机制
  2. 学习了Semaphore信号量,在服务限量跟池化技术上应用
  3. 学习了CountDownLatch闭锁,用来在并行任务同步或者多任务汇总的场景下
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐