← 返回首页
ArrayBlockingQueue实现生产者与消费者
发表时间:2023-04-10 16:58:33
ArrayBlockingQueue实现生产者与消费者

ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的。

1.什么是阻塞队列

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来。

Java提供了很多种阻塞队列,如下所示:

2.生产者与消费者案例

问题原型如下: 妈妈负责做馒头,大林和小林负责吃馒头,厨房有个锅,锅最多放10个馒头,妈妈一共做100个馒头就结束。 使用多线程实现生产者与消费者的多线程例子。统计大林和小林各吃了多少个馒头?

传统对象锁实现方式:

/*
 * 一共三个线程
 *
 * 一个生产者(Producer)线程:妈妈做馒头的线程。
 * 两个消费者(Consumer)线程:两个儿子不停的吃馒头的线程。
 *
 * 临界资源就是锅(Pot)。
 * */

import java.util.Random;
import java.util.Stack;

class Pot {
    public static final int MAX_NUM = 100; //妈妈一共做100个馒头结束了。
    public static final int POT_LEN = 10; //表示这个锅临界资源最多只能蒸10个馒头
    public Stack<Integer> stack = new Stack<Integer>(); //锅临界资源

    public static int iCounter = 0;

    public int bigSonNumber = 0; //统计大林吃了多个馒头。
    public int smallSonNumber = 0; //统计小林吃了多少各馒头。

    //往锅里添加馒头。
    public void makeCake() {
        Pot.iCounter++;
        stack.push(Pot.iCounter);
        System.out.println("妈妈做完了第:" + Pot.iCounter + "个馒头....");

    }

    //从锅里取馒头吃
    public void eatCake() {
        int temp = stack.pop();
        String name = Thread.currentThread().getName();
        if ("大林".equals(name)) {
            bigSonNumber++;
        } else {
            smallSonNumber++;
        }
        System.out.println(name + "吃掉第" + temp + "个馒头....");

    }

}

//生产者的线程类

class Producer implements Runnable {

    private Pot pot;

    public Producer(Pot pot) {
        this.pot = pot;
    }

    @Override
    public void run() {
        try {
            while (true) {
                //妈妈不停的做馒头。。。
                synchronized (pot) {
                    //判断馒头是否做完了。退出线程
                    if (Pot.iCounter >= 100) {
                        System.out.println("100个馒头做完了,明年接着做馒头....");
                        //唤醒儿子吃馒头。
                        pot.notifyAll();
                        break;
                    }

                    //判断这个锅是不是满了。如果满了。怎么办。
                    if (pot.stack.size() == Pot.POT_LEN) {
                        //说明锅里的馒头已经装满了.
                        pot.notifyAll(); //唤醒儿子线程过来吃馒头。
                        pot.wait();
                    } else {
                        pot.makeCake();
                        try {
                            Thread.sleep(20);
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    private Pot pot;

    public Consumer(Pot pot) {
        this.pot = pot;
    }

    @Override
    public void run() {
        try {
            while (true) {
                synchronized (pot) {
                    //儿子不停的吃馒头..
                    if (Pot.iCounter >= 100 && pot.stack.empty()) {
                        System.out.println("100个馒头全部吃完了,明年继续吃馒头....");
                        break;
                    }
                    //判断锅现在是不是空的。
                    if (pot.stack.isEmpty()) {
                        //说明锅是空的。
                        pot.notifyAll(); //唤醒妈妈过来做馒头。
                        pot.wait();
                    } else {
                        pot.eatCake();

                        if (Pot.iCounter >= 100 && pot.stack.empty()) {
                            System.out.println("100个馒头全部吃完了,明年继续吃馒头....");
                            pot.notifyAll(); //唤醒妈妈过来做馒头。
                            break;
                        }

                        try {
                            //int randomTimer = Math.abs(new Random(50).nextInt());
                            pot.notifyAll(); //唤醒妈妈过来做馒头。
                            pot.wait();

                            Thread.sleep(5);
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

public class ProducerAndConsumerDemo {

    public static void main(String[] args) {

        Pot pot = new Pot();
        Producer mother = new Producer(pot);
        Consumer consumer = new Consumer(pot);

        //生成三个线程
        Thread motherThead = new Thread(mother);
        Thread bigSonThread = new Thread(consumer, "大林");
        Thread smallSonThread = new Thread(consumer, "小林");

        motherThead.start();
        bigSonThread.start();
        smallSonThread.start();

        try {
            motherThead.join();
            bigSonThread.join();
            smallSonThread.join();
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        System.out.println("大林吃了:" + pot.bigSonNumber + "个馒头!");
        System.out.println("小林吃了:" + pot.smallSonNumber + "个馒头!");
    }
}

使用ArrayBlockingQueue阻塞队列实现如下:


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ArrayBlockingQueueDemo {
    private final static ArrayBlockingQueue<Cake> queue = new ArrayBlockingQueue<>(10);

    public static void main(String[] args) {
        Thread mother = new Thread(new Producer(queue));
        Consumer c = new Consumer(queue);

        Thread bigSon = new Thread(c, "大林");
        Thread smallSon = new Thread(c, "小林");

        mother.start();
        bigSon.start();
        smallSon.start();

        try {
            mother.join();
            bigSon.join();
            smallSon.join();
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        System.out.println("大林吃了:" + Consumer.bigSonNum + "个馒头。");
        System.out.println("小林吃了:" + Consumer.smallSonNum + "个馒头。");
    }
}

class Cake {
    public volatile static AtomicInteger iCount = new AtomicInteger(100);

    public Cake() {
        Cake.iCount.decrementAndGet();
    }

    @Override
    public String toString() {
        return "馒头";
    }
}

/**
 * 生产者线程
 */
class Producer implements Runnable {
    private final ArrayBlockingQueue<Cake> mAbq;

    Producer(ArrayBlockingQueue<Cake> arrayBlockingQueue) {
        this.mAbq = arrayBlockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            if (Cake.iCount.get() <= 0) {
                System.out.println("妈妈100个馒头做完了....");
                break;
            }
            Produce();
        }
    }

    private void Produce() {
        try {
            Cake c = new Cake();
            mAbq.put(c);
            System.out.println("妈妈做一个" + c + ",还剩:" + Cake.iCount.get() + "个馒头。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

/**
 * 消费者线程
 */
class Consumer implements Runnable {

    public static int bigSonNum = 0;
    public static int smallSonNum = 0;
    ;
    private ArrayBlockingQueue<Cake> mAbq;

    Consumer(ArrayBlockingQueue<Cake> arrayBlockingQueue) {
        this.mAbq = arrayBlockingQueue;
    }

    @Override
    public void run() {
        if (Cake.iCount.get() <= 0 && mAbq.isEmpty()) {
            return;
        }
        while (true) {
            if (Cake.iCount.get() <= 0 && mAbq.isEmpty()) {
                break;
            }
            try {
                this.comsume();
                if (Cake.iCount.get() <= 0 && mAbq.isEmpty()) {
                    break;
                }
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private synchronized void comsume() throws InterruptedException {
        if (Cake.iCount.get() <= 0 && mAbq.isEmpty()) {
            return;
        }

        Cake c = mAbq.take();
        String name = Thread.currentThread().getName();
        if ("大林".equals(name)) {
            Consumer.bigSonNum++;
        } else {
            Consumer.smallSonNum++;
        }
        System.out.println(name + "吃了一个" + c + ",还剩:" + Cake.iCount.get() + "个馒头。");


    }
}

运行效果:

...
大林吃了一个馒头,还剩:0个馒头。
小林吃了一个馒头,还剩:0个馒头。
大林吃了:50个馒头。
小林吃了:50个馒头。