线程通信

线程通信的引入

应用场景:生产者和消费者问题

假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走消费;
如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止;
如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止。

问题分析
  • 这是一个线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件
  • 对于生产者,没有生产产品之前,要通知消费者等待。而生产了产品之后,又需要马上通知消费者消费
  • 对于消费者,在消费之后,要通知生产者已经消费结束,需要继续生产新产品以供消费
  • 在生产者消费者问题中,仅有synchronized是不够的
  • synchronized可阻止并发更新同一个共享资源,实现了同步
  • synchronized不能用来实现不同线程之间的消息传递(通信)
线程通信相关方法
wait()方法

wait()方法是让当前线程一直等待(即让线程释放了对共享对象的锁),直到其它线程通知 。

wait(long timeout)方法可以指定一个超时时间,过了这个时间如果没有被notify()唤醒,则函数还是会返回。如果传递一个负数timeout会抛出IllegalArgumentException异常。参数为毫秒值。

notify()方法

notify()方法唤醒一个处于等待状态的线程(调用了wait()方法而进入等待状态的线程)。

notifyAll()方法

notifyAll()方法会唤醒所有在共享变量上由于调用wait系列方法而进入等待状态的线程。

注意:

  1. notifyAll()中All的含义是所有的线程,而不是所有的锁,只能唤醒等待(调用wait()方法等待)同一个锁的所有线程,这一点一定要注意。

  2. notifyAll()必须在当前线程拥有监视器锁的情况下执行,否则将抛出异常IllegalMonitorStateException。意思是说必须在同步代码块中,调用此方法,否则可能出现在没有得到锁的情况下,执行了此方法,导致程序异常。wait()和notify()系列方法这样设计的目的是防止死锁或永久等待发生。

  3. notifyAll()方法任何对象都可以调用,并且无法重写此方法,因为被final修饰。

  4. notifyAll()只能释放一把锁。单独列出此条,这很重要。

  5. notifyAll()执行后,只有一个线程能得到锁,其他没有得到锁的线程会继续保持在等待状态。

注意事项
  • 这几个方法均是java.lang.Object类的方法。
  • 这几个方法都只能在同步方法或者同步代码块中使用,否则会抛出异常。
  • 只对当前单个共享变量生效,多个共享变量需要多次调用wait()方法。
  • 如果线程A调用wait()方法后处于堵塞状态时,其他线程中断(在其他线程调用A.interrupt()方法)A线程,则会抛出InterruptExcption异常而返回并终止。
案例代码展示
案例代码准备

商品类:

/**
 * 商品类
 */
public class Product {

    private String name;
    private String color;
    //判断是否有商品
    public boolean flag;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getColor() {
        return color;
    }

    public void setColor(String color) {
        this.color = color;
    }

    public Product() {
    }

    public Product(String name, String color) {
        this.name = name;
        this.color = color;
    }

    @Override
    public String toString() {
        return "Product{" +
                "name='" + name + '\'' +
                ", color='" + color + '\'' +
                '}';
    }
}

生产者线程:

/**
 * 生产者线程
 */
public class ProduceRunnable implements Runnable{

    private Product product;

    public ProduceRunnable(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        int i = 0;
        while(true){
            if(i%2==0){
                product.setName("馒头");
                product.setColor("白色");
            }else{
                product.setName("玉米饼");
                product.setColor("黄色");
            }
            System.out.println("生产者生产商品:"+product.getName()
                    +"  "+product.getColor());
            i++;
        }
    }
}

消费者线程:

/**
 * 消费者线程
 */
public class ConsumeRunnable implements Runnable{
    private Product product;

    public ConsumeRunnable(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        while(true){
            System.out.println("消费者消费商品:"+product.getName()
                    +"  "+product.getColor());
        }

    }
}

测试类:

/**
 * 测试类
 */
public class Test {
    public static void main(String[] args) {
        Product product = new Product();

        //生产者线程
        Runnable runnable1 = new ProduceRunnable(product);
        Thread thread1 = new Thread(runnable1);
        //消费者线程
        Runnable runnable2 = new ConsumeRunnable(product);
        Thread thread2 = new Thread(runnable2);

        thread1.start();
        thread2.start();

    }
}
方案一:使用同步代码块实现线程同步

需要更改生产者线程和消费者消费者线程,更改后的代码如下:

/**
 * 生产者线程
 */
public class ProduceRunnable implements Runnable{

    private Product product;

    public ProduceRunnable(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        int i = 0;
        while(true){
            synchronized (product){
                if(i%2==0){
                    product.setName("馒头");
                    product.setColor("白色");
                }else{
                    product.setName("玉米饼");
                    product.setColor("黄色");
                }
                System.out.println("生产者生产商品:"+product.getName()
                        +"  "+product.getColor());
                i++;
            }
        }
    }
}

/**
 * 消费者线程
 */
public class ConsumeRunnable implements Runnable{
    private Product product;

    public ConsumeRunnable(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        while(true){
           synchronized (product){
               System.out.println("消费者消费商品:"+product.getName()
                       +"  "+product.getColor());
           }
        }

    }
}

注意:不仅生产者要加锁,而且消费者也要加锁,并且必须是一把锁(不仅是一个引用变量,而且必须是指向同一个对象)

方案一:实现线程通信——同步代码块

需要更改生产者线程和消费者消费者线程,更改后的代码如下:

/**
 * 生产者线程
 */
public class ProduceRunnable implements Runnable{

    private Product product;

    public ProduceRunnable(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        int i = 0;
        while(true){
            synchronized (product){

                //如果已经有商品,生产者线程等待
                if(product.flag){
                    try {
                        product.wait();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                if(i%2==0){
                    product.setName("馒头");
                    product.setColor("白色");
                }else{
                    product.setName("玉米饼");
                    product.setColor("黄色");
                }
                System.out.println("生产者生产商品:"+product.getName()
                        +"  "+product.getColor());
                //生产完商品,修改商品状态,表示已经有商品
                product.flag = true;
                //通知消费者消费
                product.notify();

            }
            i++;
        }
    }
}
/**
 * 消费者线程
 */
public class ConsumeRunnable implements Runnable{
    private Product product;

    public ConsumeRunnable(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        while(true){
           synchronized (product){
               //如果没有商品,消费者等待
               if(!product.flag){
                   try {
                       product.wait();
                   } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                   }
               }
               System.out.println("消费者消费商品:"+product.getName()
                       +"  "+product.getColor());
               //消费完商品后,修改商品状态,表示没有商品
               product.flag = false;
               //通知生产者生产商品
               product.notify();
           }
        }

    }
}

运行结果(截取了一部非,应该为无限循环打印):

/**
*代码实现线程同步和线程通信后,解决了生产者和消费者问题
*/
生产者生产商品:馒头  白色
消费者消费商品:馒头  白色
生产者生产商品:玉米饼  黄色
消费者消费商品:玉米饼  黄色

注意:进行线程通信的多个线程,要使用同一个同步监视器(product),还必须要调用该同步监视器的wait()、notify()、notifyAll();

方案二:实现线程通信——同步方法

需要对商品类、生产者线程和消费者线程进行更改,更改后的代码如下:

/**
 * 商品类
 */
public class Product {

    private String name;
    private String color;

    //boolean类数据默认值为false:即默认没有商品
    public boolean flag;

    //生产商品方法
    public synchronized void produce(String name, String color) {//this
        //如果已经有商品,生产者线程等待
        if (flag) {
            try {
                //让出了CPU,会同时释放锁
                this.wait(); //必须调用同步监视器的通信方法
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //生产商品
        this.name = name;
        try {
            Thread.sleep(1); //让出了CPU,不释放锁
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.color = color;
        //输出结果
        System.out.println("生产者生产商品" + getName()+" "+getColor());
        //生产完商品,修改商品状态,表示已经有商品
        flag = true;
        //通知消费者消费
        this.notify();
    }
    //消费商品方法
    public synchronized void consume() {//this
        //如果没有商品,消费者等待
        if(!flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //消费商品
        System.out.println("消费者消费商品" +name+"  "+color);
        //消费完商品后,修改商品状态,表示没有商品
        flag = false;
        //通知生产者生产商品
        this.notifyAll();
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getColor() {
        return color;
    }

    public void setColor(String color) {
        this.color = color;
    }

    public Product() {
    }

    public Product(String name, String color) {
        this.name = name;
        this.color = color;
    }

    @Override
    public String toString() {
        return "Product{" +
                "name='" + name + '\'' +
                ", color='" + color + '\'' +
                '}';
    }
}

/**
 * 生产者线程
 */
public class ProduceRunnable implements Runnable{

    private Product product;

    public ProduceRunnable(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        int i = 0;
        while(true){
            if(i%2==0){
                product.produce("馒头","白色");
            }else{
                product.produce("玉米饼","黄色");
            }
            i++;
        }
    }
}
/**
 * 消费者线程
 */
public class ConsumeRunnable implements Runnable{
    private Product product;

    public ConsumeRunnable(Product product) {
        this.product = product;
    }

    @Override
    public void run() {
        while(true){
            product.consume();
        }

    }
}

运行结果(截取了一部非,应该为无限循环打印):

/**
*代码实现线程同步和线程通信后,解决了生产者和消费者问题
*/
生产者生产商品:馒头  白色
消费者消费商品:馒头  白色
生产者生产商品:玉米饼  黄色
消费者消费商品:玉米饼  黄色

需要注意的是:

  • 同步方法的同步监视器都是this,所以需要将produce()和consume()放入一个类Product中,保证是同一把锁
  • 必须调用this的wait()、notify()、notifyAll()方法,this可以省略,因为同步监视器是this。
方案三:实现线程通信——使用Lock锁

之前实现线程通信时,是生产者和消费者在一个等待队列中,会存在本来打算唤醒消费者,却唤醒一个生产者的问题,能否让生产者和消费者线程在不同的队列中等待呢?在新一代的Lock中提供这种实现方式。

需要对商品类、生产者线程和消费者线程进行更改,更改后的代码如下:

/**
 * 商品类
 */
public class Product {

    private String name;
    private String color;

    //boolean类数据默认值为false:即默认没有商品
    public boolean flag;

    Lock lock = new ReentrantLock();
    Condition produceCondition = lock.newCondition();
    Condition consumeCondition = lock.newCondition();

    //生产商品方法
    public void produce(String name, String color) {//this

        lock.lock();
        try {
            //如果已经有商品,生产者线程等待
            if (flag) {
                try {
                    //让出了CPU,会同时释放锁
                    produceCondition.await(); //必须调用同步监视器的通信方法
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //生产商品
            this.name = name;
            try {
                Thread.sleep(1); //让出了CPU,不释放锁
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.color = color;
            //输出结果
            System.out.println("生产者生产商品" + getName()+" "+getColor());
            //生产完商品,修改商品状态,表示已经有商品
            flag = true;
            //通知消费者消费
            consumeCondition.signal();
        }finally {
            lock.unlock();
        }

    }
    //消费商品方法
    public  void consume() {//this
        lock.lock();
        try {
            //如果没有商品,消费者等待
            if(!flag){
                try {
                    consumeCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //消费商品
            System.out.println("消费者消费商品" +name+"  "+color);
            //消费完商品后,修改商品状态,表示没有商品
            flag = false;
            //通知生产者生产商品
            produceCondition.signalAll();
        }finally {
            lock.unlock();
        }

    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getColor() {
        return color;
    }

    public void setColor(String color) {
        this.color = color;
    }

    public Product() {
    }

    public Product(String name, String color) {
        this.name = name;
        this.color = color;
    }

    @Override
    public String toString() {
        return "Product{" +
                "name='" + name + '\'' +
                ", color='" + color + '\'' +
                '}';
    }
}

运行结果(截取了一部非,应该为无限循环打印):

/**
*代码实现线程同步和线程通信后,解决了生产者和消费者问题
*/
生产者生产商品:馒头  白色
消费者消费商品:馒头  白色
生产者生产商品:玉米饼  黄色
消费者消费商品:玉米饼  黄色
Condition

Condition是在JDK1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。

它的更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition 。

一个Condition包含一个等待队列。一个Lock可以产生多个Condition,所以可以有多个等待队列。

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而Lock(同步器)拥有一个同步队列和多个等待队列。
Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。
调用Condition的await()、signal()、signalAll()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用 。

  • Conditon 中的 await() 对应 Object 的 wait();
  • Condition 中的 signal() 对应 Object 的 notify();
  • Condition 中的 signalAll() 对应 Object 的 notifyAll()。

await()方法:

使当前线程在接到信号或被中断之前一直处于等待状态。

与此 Condition 相关的锁以原子方式释放,并且出于线程调度的目的,将禁用当前线程,且在发生以下四种情况之一 以前,当前线程将一直处于休眠状态:

  • 其他某个线程调用此 Condition 的 signal() 方法,并且碰巧将当前线程选为被唤醒的线程;
  • 其他某个线程调用此 Condition 的 signalAll() 方法;
  • 其他某个线程中断当前线程,且支持中断线程的挂起;
  • 发生“虚假唤醒”
    在所有情况下,在此方法可以返回当前线程之前,都必须重新获取与此条件有关的锁。在线程返回时,可以保证它保持此锁。

signal()方法:
唤醒一个等待线程。
如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。

signalAll()方法:
唤醒所有等待线程。
如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。

同步队列和等待队列是什么意思

在Java中,同步队列和等待队列是多线程同步机制中的两个重要概念。

同步队列是指线程在获取对象锁时,需要先进入同步队列中等待获取锁的队列。同步队列是一个FIFO(先进先出)的队列,线程按照获取锁的顺序排队等待锁的释放。当锁被释放时,同步队列中的第一个线程会被唤醒,并且获取锁之后执行相应的操作。

等待队列是指线程调用对象的wait()方法后,进入等待队列中等待被唤醒。等待队列也是一个FIFO的队列,线程按照调用wait()方法的顺序排队等待被唤醒。当另外一个线程调用对象的notify()或notifyAll()方法时,等待队列中的线程会被唤醒,并且进入同步队列中等待获取锁。

对于Object的监视器模型,每个对象都有一个同步队列和等待队列,用于实现对象锁和等待唤醒机制。

对于Lock(同步器)来说,它也有一个同步队列和多个等待队列,用于实现锁的获取和释放,以及等待唤醒机制。Lock提供了更加灵活和可扩展的锁机制,支持更多的等待队列,可以更加精细地掌控线程的并发访问。

同步队列和等待队列的区别

同步队列和等待队列是用于实现线程同步和等待唤醒机制的重要数据结构。它们的主要区别在于其所处的上下文环境和作用。

同步队列是指在同步块或者同步方法中,线程竞争对象锁失败后所进入的队列。同步队列的作用是用于实现线程的互斥和同步,保证同一时间只有一个线程能够访问同步块或同步方法。

等待队列则是在线程等待唤醒机制中,当线程等待某个条件达成而被阻塞时所进入的队列。等待队列的作用是用于实现线程的等待唤醒机制,让线程在某个条件不满足时主动进入等待队列,并在条件满足时被唤醒。

在Java的Object监视器模型中,一个对象拥有一个同步队列和一个等待队列。同步队列用于实现对象锁的获取和释放,而等待队列用于实现线程的等待唤醒机制。而在Lock(同步器)的实现中,一个Lock对象拥有一个同步队列和多个等待队列,用于实现锁的获取和释放,以及等待唤醒机制。

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐