Java线程池源码解析

多线程开发中常用到线程池, 你是否还停留在fix, cache, single几种线程池的简单使用? 是时候深入一下源代码了

讲线程池, 首先简单看看Java中单个线程的实现方式

Thread & Runnable/Callable

这3个接口是所有多线程开发的基础. Runnable和Callable是对任务的抽象, 分别代表了有返回的和无返回的任务类型. 各自只定义一个方法, 就是这个任务该做什么,

public interface Runnable {
    public abstract void run();
}

public interface Callable<V> {
    V call() throws Exception;
}

有了任务之后, 我们当然可以让main线程来处理它, 但是在设计上, 这两个接口是为了让子线程调用做的. 怎样启动子线程来执行任务? 这就需要用上Thread类, 相信大家再熟悉不过, 它实现了Runnable接口, 内部定义了如何启动新线程的native方法start0(封装在start方法内), start0启动了子线程后会在子线程内回调run方法, 从而实现了子线程的方法调用.
native方法是如何启动新线程, 怎样跟操作系统线程映射的, 我们这里咱不做深入.

线程池

由于启动一个新的线程必须通过Thread, 调用native方法, 触及到操作系统层面的开销往往很大. 通过线程池, 可以缓存部分线程, 复用已有线程, 避免频繁调用native方法, 是最主要的目标.

线程池 vs 连接池

通过池化复用资源是开发中常见的性能优化方案, 在实施中我们通常面对两类池: 线程池和连接池. 但虽然名字都叫池, 但在底层各自讨论的其实不是一个概念.
对于线程池, 池化的对象是线程, 线程的本质是CPU的运行时间片, 池化线程, 其实是提前申请一批CPU时间片, 在时间片到来时, 直接处理任务即可. 好处是不需要每次处理任务的时候再单独申请.
对于连接池, 池化的对象是连接, 而连接的本质是一个句柄(引用), 它是通过连接信息, 经过协议握手之后保留下来的可直接使用的执行入口. 池化连接, 可以避免频繁的协议握手操作.

思考一下, 如果让我们自己实现一个线程池, 我们会怎么做? 下面是一个简单的demo.

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;

public class SimpleThreadPool {

    // 线程池关闭标志
    private boolean shutdown;

    // worker线程运行信号
    private CountDownLatch countDownLatch;

    // 任务队列
    private ConcurrentLinkedQueue<Runnable> taskQueue
            = new ConcurrentLinkedQueue<>();

    public SimpleThreadPool(int threadNum) {
        countDownLatch = new CountDownLatch(threadNum);
        for (int i = 0; i < threadNum; i++) {
            // 定义并启动工作线程
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (!taskQueue.isEmpty() || !shutdown) {
                            Runnable r = taskQueue.poll();
                            if (r == null) {
                                Thread.sleep(100);
                                continue;
                            }
                            r.run();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            }).start();
        }
    }

    public void submit(Runnable runnable) {
        taskQueue.add(runnable);
    }

    public void shutdown() throws InterruptedException {
        shutdown = true; // 关闭线程池标志
        countDownLatch.await(); // 等待worker线程退出
    }

}

public class SimpleThreadPoolDemo {

    public static void main(String[] args) 
            throws InterruptedException {
        SimpleThreadPool simpleThreadPool 
                = new SimpleThreadPool(2);
        class PrintRunnable implements Runnable{
            @Override
            public void run() {
                System.out.println("current thread:" + 
                        Thread.currentThread().getId());
            }
        }
        for (int i = 0; i < 10; i++) {
            simpleThreadPool.submit(new PrintRunnable());
        }
        simpleThreadPool.shutdown();
    }

}

麻雀虽小, 五脏却全, SimpleThreadPool已经包含了线程池所需要的几大要素, 并且在实际开发中常用的ThreadPoolExecutor线程池里也有1比1的对应定义:

  • 运行时状态, SimpleThreadPool只有shutdown关闭标记, workQueue保存当前的工作线程. 而ThreadPoolExecutor提供了更加丰富的运行时状态, 更好地监控线程池运行状态的同时, 也可以对线程池实施更细粒度的控制.
  • 任务队列(SimpleThreadPool.taskQueue -> ThreadPoolExecutor.workQueue) 用于保存待执行的任务
  • 工作线程, 在SimpleThreadPool中采用的是固定数量的线程, 每个工作线程并发地从任务队列中获取任务执行, 而ThreadPoolExecutor支持丰富的自定义方案, 能够根据配置动态调整工作线程的数量.
  • 并发控制, 这个在我们的SimpleThreadPool中做的很简陋(甚至难免会有bug), 而ThreadPoolExecutor则使用更加精妙的方法实现并发安全控制.

ThreadPoolExecutor

行文至此, 本文的主角终于登场. ThreadPoolExecutor是常用的线程池实现类, 可能开发中并不会直接面对它, 但我们更熟悉的Executors底层调用的就是这个类. 使用后者创建不同类型的线程池无非是使用不同的参数实例化ThreadPoolExecutor实例.

继承关系 & 属性

下面是ThreadPoolExecutor的类继承图, 可以清晰地看出功能继承关系.
Executor是顶层接口, 根据前面的分析可知它是处理任务(Runnable)的接口, 这是ThreadPoolExecutor最基础的的功能.
ExecutorService接口定义了线程池基础的一些方法, 包括:
1. 关闭方法(shutdown()shutdownNow()), 区别在如何对待正在执行/排队的任务, 是关闭它, 还是等待执行完成
2. 提交单个任务的方法(submit(Runnable, T):Future <T>, submit(Callable<T>):Future<T>), 和提交批量任务的方法invokeAll(), 定义了提交两种类型任务的方法, 返回的是一个(批量是多个)Future对象, 在ThreadPoolExecuter中返回的是实现类FutureTask, 支持异步对任务进行执行/等待/停止.
AbstractExecutorService抽象类实现了核心的submit,invokeAny, invokeAll方法, 实现的本质都是将Runnable/Callable封装成FutureTask对象, 调用抽象方法execute(异步方法, 不阻塞), 最后返回FutureTask对象给调用方.

// AbstractExecutorService
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 调用newTaskFor封装成FutureTask
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask); // 这是子类实现各自逻辑的入口方法, ThreadPoolFactory
                    // 中的实现方法看后文
    return ftask;
}
ThreadPoolExecutor继承图

下图是ThreadPoolExecutor定义的所有属性:

ThreadPoolExecutor定义的属性

基本上都见名知意, 这里摘几个重要的赘述一下:

  • corePoolSize, maximumPoolSize, largestPoolSize.
    这几个是线程池大小的核心配置, corePoolSize决定了核心线程数, 核心线程即便在没有任务的时候也不会回收. maximumPoolSize则是允许最大的线程数, 当任务数大于corePoolSize时, 会临时增加工作线程. 超过corePollSize小于maximumPoolSize的线程会在闲置一段时间后被回收. largestPoolSize则是线程池实例达到过的最高工作线程数, 利用它, 结合workers实际大小可以对线程池的指标进行监控和调优.
  • workQueue, workers
    workQueue是任务队列, Executor底层默认通过LinkedBlockingQueue实现.
    workers是工作线程集合
  • allowCoreThreadTimeOut, keepAliveTime
    当allowCoreThreadTimeOut为true时, 核心线程空闲时间超过keepAliveTime也会被回收
  • threadFactory
    线程工厂, 当需要新增工作线程的时候, 通过工厂来创建. Executors默认使用自己的DefaultThreadFactory实现.

以上几个就是线程池最核心的几个配置, Executors创建线程池修改的主要就是这几个(如果不是全部的话)参数. 下面就来看看依赖这些参数, 线程池是如何运行的.

运行时

启动线程池

ThreadPoolExecutor源码中是没有start相关方法的, 构造方法所做的也只是初始化一些对象属性

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 参数合法性校验
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    // 赋初值
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

除此之外, ThreadPoolExecutor还定义了运行状态变量ctl, 这是一个int类型(32bit长度), 高3位保存运行状态, 低位用来保存运行中的工作线程数. 所以线程池最大能够支撑的线程数为2^29-1.

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 运行中
private static final int RUNNING    = -1 << COUNT_BITS;
// 已关闭, 工作线程还在运行, 只是不接收新任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 已关闭, 强制中断运行中的工作线程, 同样不接收新任务
private static final int STOP       =  1 << COUNT_BITS;
// 所有的工作线程已停止, 等待执行terminated方法(默认为空方法, 给子类实现功能)
private static final int TIDYING    =  2 << COUNT_BITS;
// 已执行terminated方法
private static final int TERMINATED =  3 << COUNT_BITS;

// 封装和解析ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

添加任务 & 增加工作线程

创建线程池对象之后, 并无start方法来启动它(意味着此时并没有工作线程运行), 要让线程池工作起来须添加任务, 对应的方法是AbstractExecutorService.sumbit(前文已经提过). submit方法会生成一个FutureTask对象, 调用子类实现的execute(Future)方法. 在execute方法中会完成增加并启动工作线程的工作.

// ThreadPoolExecutor
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 线程池运行状态 
    int c = ctl.get();
    // 工作线程数小于核心线程数, 添加一个新工作线程, 并将command作为第一个任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 工作线程新增失败时, 将任务添加到任务队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 由于方法未加锁, 所以做二次判断, 保证系统关闭时正确回调reject方法, 
        // 线程池运行时至少有一个工作线程
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 添加至队列失败,回调RejectedExecutionHandler,默认的实现是抛出对应异常
    else if (!addWorker(command, false))
        reject(command);
}

addWorker()方法所做的, 是在一系列的状态检查(包括线程数限制, 线程池运行状态判断等等)通过之后将Runnable封装成Worker对象, 添加到workers工作线程集合, 最后启动它. 也就是说, 真正的工作线程是被封装在Worker对象中的. Worker对象继承自AQS类, 以支持加锁操作; 实现了Runnable接口, run方法是子线程中执行的入口.

// ThreadPoolExecutor内部类
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{

    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks; // 工作线程完成的任务数, 失败的也计算在内

    Worker(Runnable firstTask) {
        setState(-1); // 设置父类AQS的state, 用于防止错误中断
        this.firstTask = firstTask;
        // 这里是关键!! 
        // 通过ThreadFactory来创建新的线程,入参为this(implements Runnable), 所以外层调用会通过this.thread.start()启动此线程, 子线程内实际执行的方法是run()
        this.thread = getThreadFactory().newThread(this);
    }

    // 工作线程的核心操作在runWorker中
    public void run() {
        runWorker(this);
    }

    // ...
    
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; // 创建Worker对象的时候带进来的初始Task
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 反复尝试从任务队列中获取任务, getTask方法是阻塞方法, 当返回null时表示等待超时
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 通过中断关闭线程, 后文再做分析
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // ThreadPoolExecutor默认为空方法, 留给子类实现功能
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 调用真实的业务方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // ThreadPoolExecutor默认为空方法, 留给子类实现功能
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 工作线程结束, 将线程从workers中移除. 
            processWorkerExit(w, completedAbruptly);
        }
    }

}

关闭线程池

Runnable任务添加到线程池之后, 子线程开始不断从任务队列获取新的任务来执行. 当长时间有任务空闲, 非核心线程就需要关闭. 而当外部调用关闭线程池时, 所有子线程都需要关闭. 他们如何关闭? 下面来讨论

先来看工作线程超时关闭机制. 线程运行过程中, ThreadPoolExecutor.Worker#runWorker()方法中有一个getTask方法, 它负责不断从工作线程队列获取新的Runnable来执行, 当getTask返回null时, 线程结束.

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 下面这个条件判断比较绕, 可以将其转换一下:
        // ((rs >= SHUTDOWN && rs >= STOP) || 
        //  (rs >= SHUTDOWN && workQueue.isEmpty()))
        // 它的意思是:
        // 1.在线程池已SHUTDOWN(停止接受新任务)的前提下, 如果STOP(不再处理
        // 排队中的任务)返回null, 以令当前线程停止;
        // 2.在线程池已SHUTDOWN(停止接受新任务)的前提下, 如果工作队列为空, 当前 
        // 线程返回null, 线程停止(因为不会再有新的任务进来了)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 工作线程数
        int wc = workerCountOf(c);

        // 获取新的Task超时,当前循环方法退出,线程即将关闭:
        // 1.当线程数超过核心线程数,直接退出
        // 2.当线程数未超核心线程数(即剩下的全是核心线程),根据配置来选择是否关闭
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 满足上面条件时, 通过CAS减少工作线程数, 返回null关闭当前线程
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 从任务队列中获取任务,这里的keepAliveTime就是创建线程池的时候配置的核心线程最大存活时间
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

关闭线程池有两个方法shutdownshutdownNow`, 前者只是把ctl状态改为SHUTDOWN, 等待工作线程在下次getTask时发现关闭状态主动终止. 而后者则会主动中断workers中的所有线程.比较简单. 不再赘述.

以上, 就是我们ThreadPoolExecutor的生命周期.

发表评论

电子邮件地址不会被公开。

+ 83 = 91