ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的。
1.什么是阻塞队列

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来。
Java提供了很多种阻塞队列,如下所示:
2.生产者与消费者案例
问题原型如下: 妈妈负责做馒头,大林和小林负责吃馒头,厨房有个锅,锅最多放10个馒头,妈妈一共做100个馒头就结束。 使用多线程实现生产者与消费者的多线程例子。统计大林和小林各吃了多少个馒头?
传统对象锁实现方式:
/*
* 一共三个线程
*
* 一个生产者(Producer)线程:妈妈做馒头的线程。
* 两个消费者(Consumer)线程:两个儿子不停的吃馒头的线程。
*
* 临界资源就是锅(Pot)。
* */
import java.util.Random;
import java.util.Stack;
class Pot {
public static final int MAX_NUM = 100; //妈妈一共做100个馒头结束了。
public static final int POT_LEN = 10; //表示这个锅临界资源最多只能蒸10个馒头
public Stack<Integer> stack = new Stack<Integer>(); //锅临界资源
public static int iCounter = 0;
public int bigSonNumber = 0; //统计大林吃了多个馒头。
public int smallSonNumber = 0; //统计小林吃了多少各馒头。
//往锅里添加馒头。
public void makeCake() {
Pot.iCounter++;
stack.push(Pot.iCounter);
System.out.println("妈妈做完了第:" + Pot.iCounter + "个馒头....");
}
//从锅里取馒头吃
public void eatCake() {
int temp = stack.pop();
String name = Thread.currentThread().getName();
if ("大林".equals(name)) {
bigSonNumber++;
} else {
smallSonNumber++;
}
System.out.println(name + "吃掉第" + temp + "个馒头....");
}
}
//生产者的线程类
class Producer implements Runnable {
private Pot pot;
public Producer(Pot pot) {
this.pot = pot;
}
@Override
public void run() {
try {
while (true) {
//妈妈不停的做馒头。。。
synchronized (pot) {
//判断馒头是否做完了。退出线程
if (Pot.iCounter >= 100) {
System.out.println("100个馒头做完了,明年接着做馒头....");
//唤醒儿子吃馒头。
pot.notifyAll();
break;
}
//判断这个锅是不是满了。如果满了。怎么办。
if (pot.stack.size() == Pot.POT_LEN) {
//说明锅里的馒头已经装满了.
pot.notifyAll(); //唤醒儿子线程过来吃馒头。
pot.wait();
} else {
pot.makeCake();
try {
Thread.sleep(20);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
class Consumer implements Runnable {
private Pot pot;
public Consumer(Pot pot) {
this.pot = pot;
}
@Override
public void run() {
try {
while (true) {
synchronized (pot) {
//儿子不停的吃馒头..
if (Pot.iCounter >= 100 && pot.stack.empty()) {
System.out.println("100个馒头全部吃完了,明年继续吃馒头....");
break;
}
//判断锅现在是不是空的。
if (pot.stack.isEmpty()) {
//说明锅是空的。
pot.notifyAll(); //唤醒妈妈过来做馒头。
pot.wait();
} else {
pot.eatCake();
if (Pot.iCounter >= 100 && pot.stack.empty()) {
System.out.println("100个馒头全部吃完了,明年继续吃馒头....");
pot.notifyAll(); //唤醒妈妈过来做馒头。
break;
}
try {
//int randomTimer = Math.abs(new Random(50).nextInt());
pot.notifyAll(); //唤醒妈妈过来做馒头。
pot.wait();
Thread.sleep(5);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
public class ProducerAndConsumerDemo {
public static void main(String[] args) {
Pot pot = new Pot();
Producer mother = new Producer(pot);
Consumer consumer = new Consumer(pot);
//生成三个线程
Thread motherThead = new Thread(mother);
Thread bigSonThread = new Thread(consumer, "大林");
Thread smallSonThread = new Thread(consumer, "小林");
motherThead.start();
bigSonThread.start();
smallSonThread.start();
try {
motherThead.join();
bigSonThread.join();
smallSonThread.join();
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("大林吃了:" + pot.bigSonNumber + "个馒头!");
System.out.println("小林吃了:" + pot.smallSonNumber + "个馒头!");
}
}
使用ArrayBlockingQueue阻塞队列实现如下:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ArrayBlockingQueueDemo {
private final static ArrayBlockingQueue<Cake> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
Thread mother = new Thread(new Producer(queue));
Consumer c = new Consumer(queue);
Thread bigSon = new Thread(c, "大林");
Thread smallSon = new Thread(c, "小林");
mother.start();
bigSon.start();
smallSon.start();
try {
mother.join();
bigSon.join();
smallSon.join();
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("大林吃了:" + Consumer.bigSonNum + "个馒头。");
System.out.println("小林吃了:" + Consumer.smallSonNum + "个馒头。");
}
}
class Cake {
public volatile static AtomicInteger iCount = new AtomicInteger(100);
public Cake() {
Cake.iCount.decrementAndGet();
}
@Override
public String toString() {
return "馒头";
}
}
/**
* 生产者线程
*/
class Producer implements Runnable {
private final ArrayBlockingQueue<Cake> mAbq;
Producer(ArrayBlockingQueue<Cake> arrayBlockingQueue) {
this.mAbq = arrayBlockingQueue;
}
@Override
public void run() {
while (true) {
if (Cake.iCount.get() <= 0) {
System.out.println("妈妈100个馒头做完了....");
break;
}
Produce();
}
}
private void Produce() {
try {
Cake c = new Cake();
mAbq.put(c);
System.out.println("妈妈做一个" + c + ",还剩:" + Cake.iCount.get() + "个馒头。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 消费者线程
*/
class Consumer implements Runnable {
public static int bigSonNum = 0;
public static int smallSonNum = 0;
;
private ArrayBlockingQueue<Cake> mAbq;
Consumer(ArrayBlockingQueue<Cake> arrayBlockingQueue) {
this.mAbq = arrayBlockingQueue;
}
@Override
public void run() {
if (Cake.iCount.get() <= 0 && mAbq.isEmpty()) {
return;
}
while (true) {
if (Cake.iCount.get() <= 0 && mAbq.isEmpty()) {
break;
}
try {
this.comsume();
if (Cake.iCount.get() <= 0 && mAbq.isEmpty()) {
break;
}
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private synchronized void comsume() throws InterruptedException {
if (Cake.iCount.get() <= 0 && mAbq.isEmpty()) {
return;
}
Cake c = mAbq.take();
String name = Thread.currentThread().getName();
if ("大林".equals(name)) {
Consumer.bigSonNum++;
} else {
Consumer.smallSonNum++;
}
System.out.println(name + "吃了一个" + c + ",还剩:" + Cake.iCount.get() + "个馒头。");
}
}
运行效果:
...
大林吃了一个馒头,还剩:0个馒头。
小林吃了一个馒头,还剩:0个馒头。
大林吃了:50个馒头。
小林吃了:50个馒头。