← 返回首页
JavaSE系列教程(六十七)
发表时间:2020-02-12 12:36:54
讲解线程池的使用。

使用线程池可以提高系统的性能,因为自己创建线程和清除线程的开销比较大。Java中创建线程池很简单,只需要调用Executors中相应的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads)方法。

1.Executors创建线程池

方法名字 功能
newFixedThreadPool(int nThreads) 创建固定大小的线程池
newSingleThreadExecutor() 创建只有一个线程的线程池
newCachedThreadPool() 创建一个不限线程数上限的线程池,任何提交的任务都将立即执行
        ExecutorService executor1 = Executors.newFixedThreadPool(1);
        ExecutorService executor2 = Executors.newSingleThreadExecutor();
        ExecutorService executor3 = Executors.newCachedThreadPool();

2.ThreadPoolExecutor创建线程池

小程序使用Executors快捷方法没什么问题,对于服务端需要长期运行的程序,创建线程池应该直接使用ThreadPoolExecutor的构造方法。

例如:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                new ThreadPoolExecutor.DiscardOldestPolicy());


构造方法参数的详细解释如下:
// Java线程池的完整构造函数
public ThreadPoolExecutor(
  int corePoolSize, // 线程池长期维持的线程数,即使线程处于Idle状态,也不会回收。
  int maximumPoolSize, // 线程数的上限
  long keepAliveTime, TimeUnit unit, // 超过corePoolSize的线程的idle时长,
                                     // 超过这个时间,多余的线程会被回收。
  BlockingQueue<Runnable> workQueue, // 任务的排队队列
  ThreadFactory threadFactory, // 新线程的产生方式
  RejectedExecutionHandler handler) // 拒绝策略

以运营一家装修公司做个比喻。公司在办公地点等待客户来提交装修请求;公司有固定数量的正式工以维持运转;旺季业务较多时,新来的客户请求会被排期,比如接单后告诉用户一个月后才能开始装修;当排期太多时,为避免用户等太久,公司会通过某些渠道(比如人才市场、熟人介绍等)雇佣一些临时工(注意,招聘临时工是在排期排满之后);如果临时工也忙不过来,公司将决定不再接收新的客户,直接拒单。

线程池就是程序中的“装修公司”,代劳各种脏活累活。上面的过程对应到线程池上:

// Java线程池的完整构造函数
public ThreadPoolExecutor(
  int corePoolSize, // 正式工数量
  int maximumPoolSize, // 工人数量上限,包括正式工和临时工
  long keepAliveTime, TimeUnit unit, // 临时工游手好闲的最长时间,超过这个时间将被解雇
  BlockingQueue<Runnable> workQueue, // 排期队列
  ThreadFactory threadFactory, // 招人渠道
  RejectedExecutionHandler handler) // 拒单方式

3.线程池提交任务的三种方式

方法名字 是否关心返回结果
Future submit(Callable task)
void execute(Runnable command)
Future<?> submit(Runnable task) 否,虽然返回Future,但是其get()方法总是返回null

4.线程池关闭

在线程池使用完成之后,我们需要对线程池中的资源进行释放操作,这就涉及到关闭功能。我们可以调用线程池对象的shutdown()和shutdownNow()方法来关闭线程池。

这两个方法都是关闭操作,又有什么不同呢?

1).shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。 2).shutdownNow()会将线程池状态置为SHUTDOWN,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。

另外,关闭线程池涉及到两个返回boolean的方法,isShutdown()和isTerminated,分别表示是否关闭和是否终止。

5.综合实例

妈妈负责做馒头,大林和小林负责吃馒头,厨房有个锅,锅最多放10个馒头,妈妈一共做100个馒头就结束。 使用使用线程池创建多线程实现生产者与消费者的多线程例子。统计大林和小林各吃了多少个馒头?

//锅类
/*
* 有限缓冲区
* Stack描述锅。后进先出
*
* */
public class Pot {

    private Stack<Integer> pot = new Stack<Integer>();

    public static final int MAX_LEN = 10; //锅最多只能放10个馒头。
    public static final int MAX_NUMBER =100; //妈妈一共做100个馒头,程序就结束了。
    public static int COUNT = 0; //统计现在做了多少个馒头。
    private int bigSonNum; //大儿子吃的数量
    private int smallSonNum; //小儿子吃的数量

    public Stack<Integer> getPot() {
        return pot;
    }

    //做馒头的方法
    public synchronized void makeCake(){

        COUNT++;
        pot.push(COUNT);
        System.out.println("妈妈做了第" + COUNT + "个馒头....");
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //吃馒头的方法
    public synchronized  void eatCake(String name){
        int temp = pot.pop(); //弹出一个馒头。
        System.out.println(name+"吃了第" + temp + "个馒头...");

        if("大林".equals(name)){
            this.bigSonNum++;
        }else{
            this.smallSonNum++;
        }

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    //显示结果
    public void showResult(){
        System.out.println("大林吃了:" + bigSonNum + "个馒头!");
        System.out.println("小林吃了:" + smallSonNum+"个馒头!");
    }

}


//生产者,妈妈类
public class Producer implements Runnable {

    private Pot pot;

    public Producer(Pot pot) {
        this.pot = pot;
    }

    @Override
    public void run() {
        while(true){
          synchronized (pot) {
              //不停的做馒头。。。
              //判断什么时候就不做了。
              if (Pot.COUNT >= 100) {
                  break;
              }
              //怎么判断锅填满了呢。
              if (pot.getPot().size() == Pot.MAX_LEN) {
                  //妈妈就进入等待队列了,等待儿子吃馒头。
                  System.out.println("锅里馒头满了,等待儿子吃馒头....");
                  try {
                      //可以换线儿子吃馒头。
                      pot.notifyAll();
                      pot.wait();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              } else {
                  //可以换线儿子吃馒头。
                  pot.notifyAll();
                  pot.makeCake();
              }
          }

        }
    }
}

//消费者类,儿子类
public class Consumer implements Runnable {

    private Pot pot;
    private String name;


    public Consumer(Pot pot,String name) {
        this.pot = pot;
        this.name = name;
    }

    @Override
    public void run() {
        while(true){

            synchronized (pot) {
                //儿子不停的吃馒头。
                //判断什么时候不能了。
                if (pot.getPot().size() == 0 && Pot.COUNT >= 100) {
                    break;
                }

                //判断锅里有馒头吗?
                if (pot.getPot().size() == 0) {
                    //妈妈就进入等待队列了,等待儿子吃馒头。
                    System.out.println("锅里没有馒头了,等待妈妈做馒头....");
                    try {
                        //可以换醒妈妈做吃馒头。
                        pot.notifyAll();
                        //pot.notify();
                        pot.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    pot.eatCake(this.name);
                    pot.notifyAll();
                }
            }
        }
    }
}

public class Test {
     //阻塞式关闭线程池。 
     public static void awaitAfterShutdown(ExecutorService threadPool) {
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(200, TimeUnit.SECONDS)) {
                threadPool.shutdownNow();
            }
        } catch (InterruptedException ex) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }

    }

    public static void main(String[] args) {

        Pot pot = new Pot();
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 10, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Producer mother = new Producer(pot);
        Consumer bigSon = new Consumer(pot,"大林");
        Consumer smallSon = new Consumer(pot,"小林");

        threadPool.execute(mother);
        threadPool.execute(bigSon);
        threadPool.execute(smallSon);

        //threadPool.shutdown(); //这样关闭线程池并不能保证结果是在三个线程结束后统计。
        awaitAfterShutdown(threadPool); //优雅地等待线程池的结束,使用阻塞方法,保证线程结束后显示结果。
        pot.showResult();//显示结果
    }
}

运行结果:
妈妈做了第1个馒头....
妈妈做了第2个馒头....
妈妈做了第3个馒头....
妈妈做了第4个馒头....
妈妈做了第5个馒头....
妈妈做了第6个馒头....
妈妈做了第7个馒头....
妈妈做了第8个馒头....
妈妈做了第9个馒头....
妈妈做了第10个馒头....
锅里馒头满了,等待儿子吃馒头....
小林吃了第10个馒头...
小林吃了第9个馒头...
小林吃了第8个馒头...
小林吃了第7个馒头...
小林吃了第6个馒头...
小林吃了第5个馒头...
小林吃了第4个馒头...
小林吃了第3个馒头...
小林吃了第2个馒头...
小林吃了第1个馒头...
锅里没有馒头了,等待妈妈做馒头....
锅里没有馒头了,等待妈妈做馒头....
锅里没有馒头了,等待妈妈做馒头....
妈妈做了第11个馒头....
妈妈做了第12个馒头....
妈妈做了第13个馒头....
妈妈做了第14个馒头....
妈妈做了第15个馒头....
妈妈做了第16个馒头....
妈妈做了第17个馒头....
妈妈做了第18个馒头....
妈妈做了第19个馒头....
妈妈做了第20个馒头....
锅里馒头满了,等待儿子吃馒头....
大林吃了第20个馒头...
大林吃了第19个馒头...
大林吃了第18个馒头...
大林吃了第17个馒头...
大林吃了第16个馒头...
大林吃了第15个馒头...
大林吃了第14个馒头...
大林吃了第13个馒头...
大林吃了第12个馒头...
大林吃了第11个馒头...
锅里没有馒头了,等待妈妈做馒头....
锅里没有馒头了,等待妈妈做馒头....
妈妈做了第21个馒头....
妈妈做了第22个馒头....
妈妈做了第23个馒头....
妈妈做了第24个馒头....
妈妈做了第25个馒头....
妈妈做了第26个馒头....
妈妈做了第27个馒头....
妈妈做了第28个馒头....
妈妈做了第29个馒头....
妈妈做了第30个馒头....
锅里馒头满了,等待儿子吃馒头....
大林吃了第30个馒头...
大林吃了第29个馒头...
大林吃了第28个馒头...
大林吃了第27个馒头...
大林吃了第26个馒头...
大林吃了第25个馒头...
大林吃了第24个馒头...
大林吃了第23个馒头...
大林吃了第22个馒头...
大林吃了第21个馒头...
锅里没有馒头了,等待妈妈做馒头....
锅里没有馒头了,等待妈妈做馒头....
妈妈做了第31个馒头....
妈妈做了第32个馒头....
妈妈做了第33个馒头....
妈妈做了第34个馒头....
妈妈做了第35个馒头....
妈妈做了第36个馒头....
妈妈做了第37个馒头....
妈妈做了第38个馒头....
妈妈做了第39个馒头....
妈妈做了第40个馒头....
锅里馒头满了,等待儿子吃馒头....
大林吃了第40个馒头...
大林吃了第39个馒头...
大林吃了第38个馒头...
大林吃了第37个馒头...
大林吃了第36个馒头...
大林吃了第35个馒头...
大林吃了第34个馒头...
大林吃了第33个馒头...
大林吃了第32个馒头...
大林吃了第31个馒头...
锅里没有馒头了,等待妈妈做馒头....
锅里没有馒头了,等待妈妈做馒头....
妈妈做了第41个馒头....
妈妈做了第42个馒头....
妈妈做了第43个馒头....
妈妈做了第44个馒头....
妈妈做了第45个馒头....
妈妈做了第46个馒头....
妈妈做了第47个馒头....
妈妈做了第48个馒头....
妈妈做了第49个馒头....
妈妈做了第50个馒头....
锅里馒头满了,等待儿子吃馒头....
大林吃了第50个馒头...
大林吃了第49个馒头...
大林吃了第48个馒头...
大林吃了第47个馒头...
大林吃了第46个馒头...
大林吃了第45个馒头...
大林吃了第44个馒头...
大林吃了第43个馒头...
大林吃了第42个馒头...
大林吃了第41个馒头...
锅里没有馒头了,等待妈妈做馒头....
锅里没有馒头了,等待妈妈做馒头....
妈妈做了第51个馒头....
妈妈做了第52个馒头....
妈妈做了第53个馒头....
妈妈做了第54个馒头....
妈妈做了第55个馒头....
妈妈做了第56个馒头....
妈妈做了第57个馒头....
妈妈做了第58个馒头....
妈妈做了第59个馒头....
妈妈做了第60个馒头....
锅里馒头满了,等待儿子吃馒头....
大林吃了第60个馒头...
大林吃了第59个馒头...
大林吃了第58个馒头...
大林吃了第57个馒头...
大林吃了第56个馒头...
大林吃了第55个馒头...
大林吃了第54个馒头...
大林吃了第53个馒头...
大林吃了第52个馒头...
大林吃了第51个馒头...
锅里没有馒头了,等待妈妈做馒头....
锅里没有馒头了,等待妈妈做馒头....
妈妈做了第61个馒头....
妈妈做了第62个馒头....
妈妈做了第63个馒头....
妈妈做了第64个馒头....
妈妈做了第65个馒头....
妈妈做了第66个馒头....
妈妈做了第67个馒头....
妈妈做了第68个馒头....
妈妈做了第69个馒头....
妈妈做了第70个馒头....
锅里馒头满了,等待儿子吃馒头....
大林吃了第70个馒头...
大林吃了第69个馒头...
大林吃了第68个馒头...
大林吃了第67个馒头...
大林吃了第66个馒头...
大林吃了第65个馒头...
大林吃了第64个馒头...
大林吃了第63个馒头...
大林吃了第62个馒头...
大林吃了第61个馒头...
锅里没有馒头了,等待妈妈做馒头....
妈妈做了第71个馒头....
妈妈做了第72个馒头....
妈妈做了第73个馒头....
妈妈做了第74个馒头....
妈妈做了第75个馒头....
妈妈做了第76个馒头....
妈妈做了第77个馒头....
妈妈做了第78个馒头....
妈妈做了第79个馒头....
妈妈做了第80个馒头....
锅里馒头满了,等待儿子吃馒头....
小林吃了第80个馒头...
小林吃了第79个馒头...
小林吃了第78个馒头...
小林吃了第77个馒头...
小林吃了第76个馒头...
小林吃了第75个馒头...
小林吃了第74个馒头...
小林吃了第73个馒头...
小林吃了第72个馒头...
小林吃了第71个馒头...
锅里没有馒头了,等待妈妈做馒头....
锅里没有馒头了,等待妈妈做馒头....
妈妈做了第81个馒头....
妈妈做了第82个馒头....
妈妈做了第83个馒头....
妈妈做了第84个馒头....
妈妈做了第85个馒头....
妈妈做了第86个馒头....
妈妈做了第87个馒头....
妈妈做了第88个馒头....
妈妈做了第89个馒头....
妈妈做了第90个馒头....
锅里馒头满了,等待儿子吃馒头....
小林吃了第90个馒头...
小林吃了第89个馒头...
小林吃了第88个馒头...
小林吃了第87个馒头...
小林吃了第86个馒头...
小林吃了第85个馒头...
小林吃了第84个馒头...
小林吃了第83个馒头...
小林吃了第82个馒头...
小林吃了第81个馒头...
锅里没有馒头了,等待妈妈做馒头....
锅里没有馒头了,等待妈妈做馒头....
妈妈做了第91个馒头....
妈妈做了第92个馒头....
妈妈做了第93个馒头....
妈妈做了第94个馒头....
妈妈做了第95个馒头....
妈妈做了第96个馒头....
妈妈做了第97个馒头....
妈妈做了第98个馒头....
妈妈做了第99个馒头....
妈妈做了第100个馒头....
小林吃了第100个馒头...
小林吃了第99个馒头...
小林吃了第98个馒头...
小林吃了第97个馒头...
小林吃了第96个馒头...
小林吃了第95个馒头...
小林吃了第94个馒头...
小林吃了第93个馒头...
小林吃了第92个馒头...
小林吃了第91个馒头...
大林吃了:60个馒头!
小林吃了:40个馒头!

消费者(儿子)类,也可以改写为实现Callable接口实现。程序如下:


public class Pot {
    private Stack<Integer> pot = new Stack<Integer>();
    public static final int MAX_LEN = 10; //锅最多只能放10个馒头。
    public static final int MAX_NUMBER =100; //妈妈一共做100个馒头,程序就结束了。
    public static int COUNT = 0; //统计现在做了多少个馒头。

    public Stack<Integer> getPot() {
        return pot;
    }

    //做馒头的方法
    public synchronized void makeCake(){

        COUNT++;
        pot.push(COUNT);
        System.out.println("妈妈做了第" + COUNT + "个馒头....");
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //吃馒头的方法
    public synchronized  void eatCake(String name){
        int temp = pot.pop(); //弹出一个馒头。
        System.out.println(name+"吃了第" + temp + "个馒头...");
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//生产者类

public class Producer implements Runnable {
    private Pot pot;

    public Producer(Pot pot) {
        this.pot = pot;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (pot) {
                //不停的做馒头。。。
                //判断什么时候就不做了。
                if (Pot.COUNT >= 100) {
                    break;
                }
                //怎么判断锅填满了呢。
                if (pot.getPot().size() == Pot.MAX_LEN) {
                    //妈妈就进入等待队列了,等待儿子吃馒头。
                    System.out.println("锅里馒头满了,等待儿子吃馒头....");
                    try {
                        //可以换线儿子吃馒头。
                        pot.notifyAll();
                        pot.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //可以换线儿子吃馒头。
                    pot.notifyAll();
                    pot.makeCake();
                }
            }
        }
    }
}

//消费者类
import java.util.concurrent.Callable;

public class Consumer implements Callable<Integer> {

    private Pot pot;
    private String name;

    public Consumer(Pot pot,String name) {
        this.pot = pot;
        this.name = name;
    }

    @Override
    public Integer call() throws Exception {
        int counter = 0;
        while(true){

            synchronized (pot) {
                //儿子不停的吃馒头。
                //判断什么时候不能了。
                if (pot.getPot().size() == 0 && Pot.COUNT >= 100) {
                    break;
                }

                //判断锅里有馒头吗?
                if (pot.getPot().size() == 0) {
                    //妈妈就进入等待队列了,等待儿子吃馒头。
                    System.out.println("锅里没有馒头了,等待妈妈做馒头....");
                    try {
                        //可以换醒妈妈做吃馒头。
                        pot.notifyAll();
                        //pot.notify();
                        pot.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    pot.eatCake(this.name);
                    counter++;
                    pot.notifyAll();
                }
            }
        }
        return counter;
    }
}

//测试类
import java.util.concurrent.*;

public class Test {
    //阻塞式关闭线程池。
    public static void awaitAfterShutdown(ExecutorService threadPool) {
        threadPool.shutdown();
        try {
            if (!threadPool.awaitTermination(200, TimeUnit.SECONDS)) {
                threadPool.shutdownNow();
            }
        } catch (InterruptedException ex) {
            threadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {

        Pot pot = new Pot();
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 10, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Producer mother = new Producer(pot);
        Consumer bigSon = new Consumer(pot, "大林");
        Consumer smallSon = new Consumer(pot, "小林");

        threadPool.execute(mother);
        //submit提交Callable类型的任务...
        Future<Integer> bigSonTask = threadPool.submit(bigSon);
        Future<Integer> smallSonTask = threadPool.submit(smallSon);
        awaitAfterShutdown(threadPool); //优雅地等待线程池的结束,使用阻塞方法,保证线程结束后显示结果。
        try {
            //应该是这里出现死锁了....
            Integer bigSonNum = bigSonTask.get();
            Integer smallSonNum = smallSonTask.get();
            System.out.println("大林一共吃了:" + bigSonNum + "个馒头。");
            System.out.println("小林一共吃了:" + smallSonNum + "个馒头。");

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

6.线程池线程命名问题

我们可以通过 ThreadPoolExecutor 的构造方法的第五个参数(ThreadFactory),实现自定线程池中线程的名字。

我们自定义线程工厂类,返回自定义名称的线程对象。

例如:


ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 10, 3,
      TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadFactory() {
      final String[] profix={"张","李","王","刘","陈","孙"};
      int index = 0; //计数器.... //第一个线程就是:'小张',第二个就是'小李',以此类推.....
      @Override
          public Thread newThread(Runnable r) {
                return new Thread(r,"小"+profix[index++]);
          }
     },
  new ThreadPoolExecutor.DiscardOldestPolicy());

  threadPool.submit(r);  //第一个启动的线程名字叫:'小张'
  threadPool.submit(r);  //第二个启动的线程名字叫:'小李'