Java多线程基础--线程的创建与线程池管理

前言

各位亲爱的读者朋友,我正在创作 Java多线程系列 文章,如果您觉得内容还不错,还请点赞支持一下。

上一篇文章 中,我们回顾了线程生命周期、线程之间相互协作的知识,本篇我们继续挖掘,增强对线程的理解。

作者按:本篇按照自己有限的知识进行整理,如有谬误,还请读者在评论区不吝指出

了解系列以及总纲:Java多线程系列

重要声明

出于 方便叙述帮助基础尚且薄弱的读者理解文章内容 的目的,文中举了一些例子,但这些例子并 不能 百分百准确的对应Java中的概念,甚至有些幼稚。

读者朋友们应当注意到这一点,并且清晰的意识到自己的目标是理解Java中的概念与设计,而不必纠结于例子是否有失偏颇。

本篇博客的内容较为散碎,以下是内容大纲,您可以结合它挑选感兴趣的内容片段阅读、重新梳理知识

线程的创建与启动

在上一篇文章中,我们提到,调用 Thread#start() 即可启动该线程,而并未挖掘虚拟机 真正启动 一个线程的 具体过程

可能会让您失望,这一篇依旧不会挖掘这一细节,因为它对设计、编写优质的多线程应用毫无帮助。

如果您对此感兴趣,以下文章可能会有帮助:

先回归到概念:

操作系统中的Thread:是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位

JVM中的Thread:A thread is a thread of execution in a program. The Java Virtual Machine allows an application to have multiple threads of execution running concurrently. -- 似乎解释了,又似乎没有

"编写应用程序时,不会为了展现自己已经掌握了启动线程的知识而去启动线程",即线程是手段,这一点并不难理解。应用程序使用线程的目的在于 完成既定任务 , 并且基于多线程并发能力提高程序的运行效率、或者基于线程的特性界定职责边界使程序有序运行。

举个例子,工厂接了一批订单,需要在能力一致的一批工人中选择一批完成订单的生产,不难理解:工厂老板在意的是订单的完成,而不是工人的名字、星座、爱好。

那么如何定义线程需要完成的任务呢?

在最初的JDK中,存在两种方式:

  • 继承Thread类、覆写 run() 方法定义任务
class PrimeThread extends Thread {
    long minPrime;

    PrimeThread(long minPrime) {
        this.minPrime = minPrime;
    }

    public void run() {
        // compute primes larger than minPrime
    }
}
  • 组合优于继承的典型例子:实现Runnable接口,作为Thread的任务
class Foo {
    class PrimeRun implements Runnable {
        long minPrime;

        PrimeRun(long minPrime) {
            this.minPrime = minPrime;
        }

        public void run() {
            // compute primes larger than minPrime
        }
    }

    foo() {
        PrimeRun p = new PrimeRun(143);
        new Thread(p).start();
    }
}

随着JDK的发展,也有更多的方式定义任务,我们将在后续的系列文章中展开。

至此,您应该已经意会了 JDK doc 中所说的 a Thread is a thread of execution in a program

线程池

回到前文举得例子,工厂经过长时间的运转,积累了足够的经验,老板突然顿悟:只要工人能够胜任工作,自己完全没有必要了解工人,只需要:

  • 评估生产任务量
  • 制定好生产计划
  • 把任务和计划交给产线即可

完全不用在意是张三做还是李四做。

工人形如线程,产线便形如线程池。结合工厂的实际情况与任务的特性,可以凝练出 几种产线管理方式

在Java中,直接或者间接的依靠配置 ThreadPoolExecutor 获得线程池。

作者按:通过简单的搜索,可以发现大量的探讨线程池的博客,可能受面经影响,部分博客均围绕几个常见地面试问题展开。 但务必注意,线程池的知识内容远不止面试题题干所表现的那些内容!相比之下,理解线程池的设计更为重要

而我的文字功力有限,无法像教科书那样,顺着严谨的大纲递进式展开,还让文字显得 深刻且有趣,只能尽可能推测读者的兴趣点,展开以下内容

接下来,让我们结合生活经验,以工厂产线为例子,反思推导线程池的设计,了解 ThreadPoolExecutor 最基本的知识。

ThreadPoolExecutor 核心设计

上文中,我们以 产线 类比 线程池"工厂对工人的管理方式" 来类比 "线程池的管理设计" ,并且您一定注意到两处重点:任务工人

在线程池中, 上岗工作的线程 可以类比为 工人 ,完成产线收到的任务。

注意,该类比并不完全准确

不难推测,线程池存在两个核心内容:

  • 任务队列 BlockingQueue<Runnable> workQueue
  • 工作者集合 HashSet<Worker> workers

任务队列用于存储任务,您应该已经注意到,它使用的是juc下的 BlockingQueue 接口。它的本质还是队列,附加了两种特殊的操作:

  • 取 时满足 (或等待至满足) 队列非空
  • 存 时满足 (或等待至满足) 队列有空余空间

既然是接口,自然可以有不同的实现,您可以使用不同的实现作为线程池的任务队列。

在线程池设计中,通过依赖抽象 即BlockingQueue 进行了解耦,只关心存取的时机。您可以自行决定队列的特性,诸如大小、存储方式、优先级排序等

在先前的系列文章中还未涉及 BlockingQueue ,计划将于后续系列文章中展开,故本文也不会围绕它展开内容

接下来,让我们看一看 一个人上岗成为产线工人的全过程 ,即 Thread 成为线程池 Worker 的过程

成为线程池中的工作者

产线确定了一个岗位,管理者把岗位信息给到人力资源部门,并申请配给人力:getThreadFactory().newThread(this)

人力资源部门派遣了一个 Thread 小T 给到产线,小T已经接受了技能培训,并且知道上岗后从产线的 任务队列 中取任务、出卖体力完成它即可, void runWorker(Worker w)

小T 就成为了一个 Worker

您可能意识到,线程池只关心线程的管理,并不关心线程的创建细节,所以再次依赖抽象,对线程创建细节进行了解耦:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

关于Worker的定义、职责,泛读以下源码即可了然于心:

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        //其他略
    }

    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                //锁处理和判断略
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    }
                    //异常略
                    finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
}

您可能留意到,近年来的一些社会不良事件中总会出现一个词:临时工。当然,我并没有任何在此讨论社会问题的意愿, 一条生产线有固定的岗位,也会有按需产生的临时工岗位,甚至完全是临时工岗位,这并不难理解。而线程池也有这样的设计。

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     */
    private volatile int maximumPoolSize;
}

注意,线程池在管理时,对线程是一视同仁的,只关心核心线程数量、最大线程数量,并不会依据线程的特征将其分化为 核心/非核心。

产线如何确定一个岗位

上一节我们提到,产线确定了一个岗位后,向人力资源部门要人并安排成为产线工人。

那么产线是如何确定一个岗位的呢?即 线程池如何确定需要增加一个Worker

产线不会无缘无故的安排工人上岗工作,必然是收到了生产任务,否则就成了合理摸鱼。

顺理成章的,线程池收到一个任务时,在相应的Size限制没有达到时,优先考虑安排线程进行处理,而不是丢到任务队列中等待。

在先前的系列文章中,我们已经了解到,启动线程是较为昂贵了,虽然线程池规划了 核心线程的数量最大线程数量 ,但也不会一开始就全员上岗,而是在任务抵达时逐步的安排线程上岗。

尝试 安排线程上岗 时,

  1. 需先判断线程池工作状态,如果线程池已经关闭,自然不会再增加线程,返回失败。比如产线准备停产了,已经接的任务会安排处理,但肯定不会再招工。
  2. 如果线程池正常工作,则检查线程数量是否可以继续增加
  3. 如果可以继续增加,则尝试更新线程总数,如果失败,则说明在其他线程中也触发了addWorker逻辑,那么线程池的工作状态也可能发生了改变,如果没有改变,则重复步骤2,否则回到步骤1继续检测
  4. 上一节中的内容,得到Thread实例并让其成为Worker,开始干活

整个过程中有CAS操作,鉴于有系列文章的撰写计划,文中不再展开,相应代码可参考gist:

参考Gist

向产线(线程池)下达任务

联想一下,市场部门小王拿到了一笔单子,来到产线找到负责人老张,让老张安排干掉

  1. 老张拿起了职工工作排期表,发现 还有固定岗位空着 ,则直接向人力资源要人上岗干活;
    1. 注意,招人上岗 可能失败 ,比如老张手上的信息不及时,现在已经满额了,也有可能厂长决定产线要停掉,通知人力资源不要再派人了
  2. 假如固定岗位招不来人,老张继续核实: 产线未停产可纳入计划,在未停产且可纳入计划的情况下,把任务排进了计划
    1. 老张还是很严谨的,又再次核实产线生产状态,如果产线已经停产并且该任务没有被领取,则把任务拒掉
    2. 否则检查工人是否在岗,如没有工人在岗,则向人力资源部门要一个 临时工 处理 任务队列中的任务如果要不来人,任务也放着
  3. 如果产线停产了或者排不进计划了,老张精通人情世故,表示看看能不能拉个临时工来,能拿到人就直接处理,否则就只能拒绝了,
    1. 如果是产线停了或者达到了最大人数,则要不来人 -- 参考上个小节
    2. 否则临时工会处理该任务

而线程池中与此过程也非常类似,代码比较简短:

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
        if (command == null) // 小王在忽悠人
            throw new NullPointerException();

        int c = ctl.get();

        //1.
        if (workerCountOf(c) < corePoolSize) {
            //1.i
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2.
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command)) // 2.i
                reject(command);
            else if (workerCountOf(recheck) == 0) // 2.ii
                addWorker(null, false);
        } else if (!addWorker(command, false)) // 3
            reject(command); // 3.i
    }
}

当你拒绝任务

虽然打工人和老板都想任务及时可靠的被完成,但总有不如意的时候,某些情况下,线程池将不得不拒绝任务。

上文中已经提到:线程池已经关闭、任务队列已经排满。

当线程池拒绝任务时,事情总得有个说法,JDK设计了接口:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

在线程池实例化时,需要指明拒绝策略。

JDK中提供了4个策略:

  • CallerRunsPolicy -- 在线程池未关闭情况下,访问者线程直接负责处理
  • AbortPolicy -- 抛出 RejectedExecutionException 异常,这是运行时异常, 默认策略
  • DiscardPolicy -- 这个任务就此罢休
  • DiscardOldestPolicy -- 只要线程池没关闭,这件事情就非得干,把排在最前的任务踢掉,重走任务下达流程

蓦然回首,回看构造函数

ThreadPoolExecutor 提供了一系列重载构造函数用于获取特定实例

img.png

public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler
    ) {
        //ignore 参见gist https://gitee.com/leobert_253/codes/wh495q63tvlipum2snca131
    }
}

以此为例

  • int corePoolSize, 核心线程数量
  • int maximumPoolSize, 最大线程数量
  • long keepAliveTime, 配合 unit 表示的时间,作为IDLE 线程等待任务的超时时间,核心线程如果不允许采用超时机制将一直等待任务(默认)
  • TimeUnit unit, 配合 keepAliveTime
  • BlockingQueue workQueue 任务队列
  • ThreadFactory threadFactory 线程创建工厂
  • RejectedExecutionHandler handler 拒绝任务时的策略

当然,这些参数存在一些限制和校验,可参考 gist 进一步阅读,摘自JDK1.8。

线程池的状态标识-ctl的设计

前面的内容中已经提到了线程池Shutdown的状态,线程池具有5个状态,先看一眼代码: 相应的二进制补码已标识

注意,计算机中以补码表示数,如果是有符号数,最高位表示符号,1为负、0为非负,非负数其原码和补码一致,负数的补码: 取原码,符号位不变(保持1),其他位取反,然后加1 得到补码

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    /**
     * 即29
     * */
    private static final int COUNT_BITS = Integer.SIZE - 3;

    /**
     * 0010 0000 00000000 00000000 00000000 -1 => 
     * 0001 1111 11111111 11111111 11111111
     *
     * wc = ctl & CAPACITY 低29位存储wc
     * state = ctl & ~CAPACITY 高三位存储状态
     * */
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    //补码 111 29个0
    private static final int RUNNING = -1 << COUNT_BITS;
    //补码 000 29个0
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    //补码 001 29个0
    private static final int STOP = 1 << COUNT_BITS;
    //补码 010 29个0
    private static final int TIDYING = 2 << COUNT_BITS;
    //补码 011 29个0
    private static final int TERMINATED = 3 << COUNT_BITS;

    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }
}

很显然,ThreadPoolExecutor 将32位的int分为不同的区域标识信息,在多线程背景下,使用AtomicInteger,但本质还是int

将 32位中的 高三位用于存储状态信息,低29位存储 worker-count(即wc),信息存储于ctl中,非常传统的位运算设计。

考虑到篇幅和阅读体验,其他相关的位运算API已经剥离到 gist , 通过位运算解出状态、wc,判断状态大小等

  • Running 运行状态(实例化后的默认状态)
  • Shutdown 不接收新任务,处理任务队列中的任务
  • Stop 不接收新任务,不处理任务队列中的任务,并且中断正在处理的任务
  • Tidying 所有的任务已终止时的一个暂态,随后将执行terminate(),成功后进入Terminated状态
  • Terminated 彻底终止

生命周期变化如下:

img.png

线程池目的

此时,请您想一想,创建线程池机制的目的是什么?

前文提到,线程池是对线程进行管理,显然还不是根源。

  • 降低资源消耗。 重复利用已创建的线程,可降低线程创建和销毁造成的消耗。
  • 提高响应速度。 线程池中有线程值守,当任务到达时,不需要每次都等待线程创建。注意,并不排除任务排队、必要的线程启动情况
  • 提高线程的可管理性,对系统运行状态进行调优。 线程是稀缺资源,不能无限制的创建,使用线程池可以进行统一的分配、监控、调优。

面经常客,JDK中提供的线程池

您一定阅读过一些面经,其中包含线程池的题目。作者可能在引导您向着 "JDK中特定的API所提供的线程池特征" 方面展开作答,或者题目看起来就是这样,也许就是一个面试陷阱

作者按,不要单纯的为了应付面试和放弃了学习的初心。结合问题 讲清楚线程池的设计 要比 单纯的、枯燥的罗列通过调用Executors中的API得到的线程池对应的特征 有意义

在JDK1.5中,Java凝练了4种配置方式,可获得特定管理方式的线程池:

Java依据其特征作为Executors中的方法命名,借用它们作为这4类线程池的别名

  • FixedThreadPool 数量固定、线程可重用
  • SingleThreadExecutor 仅单个线程
  • CachedThreadPool 会根据需要创建新线程的线程池
  • ScheduledThreadPool 可定期或周期执行任务的线程池

前文已经提到,它们直接或者间接的使用了 ThreadPoolExecutor,而不是4个继承类!按照其API命名给了它们别名,但并不是类名!

FixedThreadPool

JDK中提供的包装方法如下:

public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    }
}

很显然,FixedThreadPool 是一个定额的池,nThreads 即为核心线程数量,亦为最大线程数量,注意值必须大于0。

这条产线的工人就很惨,活多了也不会加派人手,任务排队等待线程空闲;不来活也要在岗位上待着,不会释放线程。

LinkedBlockingQueue 作为任务队列,先到的任务先被处理,并且它是无界的。

SingleThreadExecutor

顾名思义,我们会得到一个单一线程的线程池。

public class Executors {
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>(),
                        threadFactory));
    }
}

可能您会疑惑,

new ThreadPoolExecutor(
    1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory
)

已经得到了一个 核心线程数、最大线程数均为1的线程池,为啥要增加 FinalizableDelegatedExecutorService 的参与?

前文未提及但您可能知道,ThreadPoolExecutor是可以重新配置的!例如重新设置核心线程数量:

public class ThreadPoolExecutor {
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

}

FinalizableDelegatedExecutorService 继承自 DelegatedExecutorService,扩展了在 finalize() 时关闭线程池。 而后者是一个Wrapper,仅暴露 ExecutorService 接口的功能,通过委托的方式封闭了重新配置线程池的能力。

CachedThreadPool

该池将使用 "线程对象" 缓存方案,核心线程数量为0,全部为临时工,并且基于上文的知识:

  1. 如果池中没有Worker,则会新增Worker处理,否则任务放入任务队列等待。
  2. 非核心线程可以运用获取任务超时时间,当获取任务超时时,则 processWorkerExit 下岗
public class Executors {
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
    }
}

ScheduledThreadPool

public class Executors {
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

不同于前三者,此时得到的线程池可 定时 处理任务。

为了实现这一点,ScheduledThreadPoolExecutor

  • 使用 DelayedWorkQueue 改变了获取任务的具体实现
  • 使用装饰模式包装原始任务,使得任务在满足 周期性 的条件时,能够重新进入任务队列

作者按:JDK中的源码实现非常精彩,值得深读。gist


相信您已经对线程池的设计有了一定的理解,JDK1.8之后也在线程池中增加了Future相关的内容,本文不再继续展开。通过Executors中API的源码,应当已经掌握得到的线程池的特征。

意犹未尽之处

行文至此,内容已经非常冗长,但也不得不告一段落。 文中的部分内容,例如Future、AtomicInteger、CAS等内容,计划在本系列的其他文章中具体展开,文中亦颇多回避。 而DelayedQueue、线程池生命周期变化时的具体细节、线程池的调优等内容,均需要结合代码、场景具体分析,限于文章主题未能尽兴,读者闲暇之余若能将源码再阅读一二,定能有更大的收获。