一、生产者—消费者模式介绍
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

二、为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

三、生产者消费者模型的实现
生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用List数组队列,数据类型只需要定义一个简单的类就好。关键是如何处理多线程之间的协作。在这个模型中,最关键就是内存缓冲区为空的时候消费者必须等待,而内存缓冲区满的时候,生产者必须等待(使用wait()和notifyAll()来实现通知与等待)。其他时候可以是个动态平衡。值得注意的是多线程对临界区资源的操作时候必须保证在读写中只能存在一个线程,所以需要设计锁的策略。

四、Java代码实现生产者—消费者模式
①.Message 类

package chapter2.producerconsumer;

/**
 * @author czd
 */
public class Message {
    private String data;

    public Message(String data) {
        this.data = data;
    }

    public String getData() {
        return data;
    }
}

②.MessageQueue 类

package chapter2.producerconsumer;

import java.util.LinkedList;

/**
 * @author czd
 */
public class MessageQueue {
    private LinkedList<Message> queue ;
    private static int MAX_LIMIT = 100;
    private int limit;

    public MessageQueue() {
        this(MAX_LIMIT);
    }

    public MessageQueue(int limit) {
        this.queue = new LinkedList<>();
        this.limit = limit;
    }

    /**
     * 将数据存放到队列中
     * @param message
     */
    public void put(Message message){
        synchronized (queue){
            while (queue.size() > limit){
                try {
                    queue.wait();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }

    /**
     * 将队列的数据拿出来
     * @return
     */
    public Message take(){
        synchronized (queue){
            while (queue.isEmpty()){
                try {
                    queue.wait();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;

        }
    }

    /**
     *获取队列最大的存放数量
     * @return
     */
    public int getMaxLimit(){
        return this.limit;
    }

    /**
     * 获取队列当前大小
     * @return
     */
    public int getQueueSize(){
        synchronized (queue){
            return queue.size();
        }
    }
}

③.ProducerThread 类

package chapter2.producerconsumer;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author czd
 */
public class ProducerThread extends Thread{
    private MessageQueue messageQueue;
    //创建初始值为0的counter
    private static AtomicInteger counter = new AtomicInteger(0);

    public ProducerThread(MessageQueue messageQueue , int seq) {
        //为线程起名字
        super("Producer>>>>>" + seq);
        this.messageQueue = messageQueue;
    }

    @Override
    public void run() {
        while (true){
            try {
                Message message = new Message("Message>>>>>" + counter.getAndIncrement());
                messageQueue.put(message);
                System.out.println(Thread.currentThread().getName() + "   Put MessageData:" + message.getData());
                Thread.sleep(100);
            }catch (Exception e){
                break;
            }

        }
    }
}

④.ConsumerThread 类

package chapter2.producerconsumer;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author czd
 */
public class ConsumerThread extends Thread{
    private MessageQueue messageQueue;

    public ConsumerThread(MessageQueue messageQueue , int seq) {
        //为线程起名字
        super("Consumer>>>>>" + seq);
        this.messageQueue = messageQueue;
    }

    @Override
    public void run() {
        while (true){
            try {
                Message message = messageQueue.take();
                System.out.println(Thread.currentThread().getName() + "   get MessageData:" + message.getData());
                Thread.sleep(100);
            }catch (Exception e){
                break;
            }

        }
    }
}

⑤.ProducerAndConsumerTest 类

package chapter2.producerconsumer;

/**
 * @author czd
 */
public class ProducerAndConsumerTest {
    public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue();
        new ProducerThread(messageQueue , 1).start();
        new ConsumerThread(messageQueue , 1).start();
        new ProducerThread(messageQueue , 2).start();
        new ConsumerThread(messageQueue , 2).start();
    }
}

输出结果
在这里插入图片描述

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐