多线程并发系列(九):线程池ThreadPoolExecutor详解与应用

star2017 1年前 ⋅ 310 阅读

Java 应用中对宝贵的稀缺资源池化是保障系统稳定运行,优化系统响应速度的重要手段。

线程池的运用场景非常广,几乎所有需要异步或并发执行任务的程序都可以使用线程池。

合理使用线程池能带来 3 个好处:

  • 降低资源消耗:通过重复利用已创建的线程,降低创建线程和销毁线程产生的消耗。

  • 提高响应效率:当任务到达时,不需要等待线程创建就能立即执行。

  • 提高线程的可管理性:线程是稀缺资源,不能无限创建,否则会消耗系统资源,降低系统稳定性。

    使用线程池可以进行统一分配、调优、监控。

线程池处理流程

线程池处理主要流程图

当提交一个新任务时,线程池的处理流程如上图:

  1. 判断核心线程池是否都在执行任务,如果否,则创建一个线程执行任务;如果是,进入下个流程。
  2. 线程池判断工作队列是否已满,如果否,则将新任务加入到工作队列中;如果是,进入下个流程。
  3. 判断线程池是否都处理工作状态,如果否,则创建一个线程执行任务;如果是,则交给饱和策略来处理此任务。

ThreadPoolExecutor

ThreadPoolExecutor 执行 execute() 方法的示意图:

ThreadPoolExecutor执行示意图

ThreadPoolExecutor 遇到下面 4 种情况会执行 execute() 方法:

  1. 如果正在运行的线程数小于 corePoolSize,则创建新线程执行任务(注意,要先获得全局锁)。
  2. 如果运行的线程等于或大小 corePoolSize,则将任务加入 BlockingQueue
  3. 如果 BlockingQueue 已满,则创建新的线程来处理任务(注意,要先获得全局锁)。
  4. 如果创建新线程将使当前运行的线程数超过 maximumPoolSize,任务将被拒绝,并调用 RejectExecutionHandler.rejectedExecution() 方法。

采用此设计的思路是在调用 execute()方法时,尽可能避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。

源码简析:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 工作线程数 < 核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //封装成工作线程 Woker(工作线程)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 运行态,尝试将任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 使用尝试使用最大线程运行
    else if (!addWorker(command, false))
        //加入失败,拒绝处理
        reject(command);
}
  • execute()方法中创建一个线程时,会让这个线程执行当前任务。
  • 当前任务执行完后,会反复从 BlockingQueue 获取任务来执行。

线程池的使用

可以通过 ThreadPoolExecutor 的构造方法创建一个线程池。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //...........
}

构造方法参数

参数名 描述
corePoolSize 核心线程池数量,新提交任务将创建一个新线程执行任务,
即使其它核心线程空闲能够执行新任务也会创建线程,
等到需要执行的任务数大于核心线程数时就不再创建
maximumPoolSize 最大线程池数量,允许创建的最大线程数。
如果队列满了,且已创建的线程数,不超过该值时,
则会创建新的线程执行任务。
keepAliveTime 超过 corePoolSize 的线程的空闲时间,超过该时间则被销毁
即线程活动保持时间
TimeUnit keepAliveTime 时间单位,TimeUnit 是个枚举类。
workQueue 任务队列,保存等待执行的任务的阻塞队列
threadFactory 线程工厂,用于创建线程,可以给线程设置更有意义的名字。
使用 Guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线程设置名称。
RejectedExecutionHandler 拒绝策略(饱和策略),当队列和线程池已满,必须采取的处理策略。

任务队列类型

BlockingQueue(任务队列):用于保存等待执行的任务的阻塞队列(Runnable 对象),可以选择以类型的阻塞队列。

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元数排序。

    若有新的任务需要执行,线程池创建新的线程直到线程数量达到 corePoolSize,则将新的任务加入到等待队列中。
    若等待队列已满,则创建线程执行,直到线程数量达到 maximumPoolSize,若大于 maximumPoolSize,则执行拒绝策略。超出 corePoolSize 的线程是有空闲存话时间的。

    注意,通过构造函数创建线程池, 构建 ArrayBlockingQueue 需要要传入容量,若容量足够大或没有达到超负荷的状态,线程数将一直维持在 corePoolSize 以下,若队列已满,则以 maximumPoolSize 为上限。

  • LinkedBlockingDeque:基于链表结构的无界阻塞队列,此队列按 FIFO 排序元素,吞吐量通常高于 ArrayBlockingQueue。构建 LinkedBlockingDeque 最好是传入容量。使用此队列,maximumPoolSize 参数将无效,即使队列中缓存了很多待执行的任务,当线程数达到 corePoolSize后就不再增加;若后续有新的任务进入,则直接加入队列等待。

    静态工厂方法 Executors.newFixedThreadPool() 使用这个队列。队列默认容量为 Integer.MAX_VALUE。如果任务提交速度持续大于处理速度,会造成队列大量阻塞,因队列很大,很可能在执行拒绝策略之前就内存溢出,所以需要注意任务提交与处理之间的协调控制。

  • SynchronousQueue不保存元素的阻塞队列,或称为直接提交队列,会马上提交执行。
    吞吐量通常要高于 LinkedBlockingQueue ,静态工厂 Executors.newCachedThreadPool 使用此队列。

    每执行一个插入操作就会阻塞,直到另一个线程调用删除操作才会被唤醒;反之每一个删除操作也都要等待一个插入操作。

  • PriorityBlockingQueue:具有优先级无限阻塞队列

    除了第一个任务直接创建线程执行外,其他的任务都会被放入了优先队列,按优先级进行重新排列执行,线程池创建的线程数不会超过 corePoolSize 的数量。

    其它队列按照先进先出的规则处理任务,而优先队列可以自定义规则根据任务的优先级顺序先后执行。

拒绝策略类型

RejectedExecutionHandler(拒绝执行策略),当队列和线程池都满时,说明线程池处于饱和状态,则必须采用一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。

  • AbortPolicy:直接抛出异常,阻止系统下一步工作。
  • CallerRunsPolicy:如果线程数达到上线,该策略会把任务队列中的任务放在调用者线程中运行。
  • DiscardPolicy:丢弃无法处理的任务,不处理。使用此策略,需要注意业务场景中是否允许任务丢失。
  • DiscardOldestPolicy:丢弃队列中最老的一个任务(最先被添加进队列,即最近要执行的任务),并尝试再次提交。

可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。如记录日志或持久化存储不能处理的任务。

向线程池提交任务

可以使用 execute()submit() 方法向线程池提交任务。

  • execute() 方法

    该方法提交的是不需要返回值的任务,所以也无法判断任务是否被线程池执行成功。如下示例:

    poolExecutor.execute(new Runnable() {
        @Override
        public void run() {
            //
        }
    });
    
  • submit() 方法

    提交的任务需要返回值时使用该方法。submit() 方法的实现由 ThreadPoolExecutor 的抽象父类 AbstractExecutorService 提供。

    使用该方法,线程池会返回一个 Future 类型的对象,通过该对象可以判断线程是否执行成功。

    通过 future 的 get() 方法来获取返回值, get() 方法会阻塞当前线程直到任务完成。

    还有 get(long timeout, TimeUnit unit) 方法,会阻塞当前线程一段时间后立即返回,有可能任务还没执行完。

    使用示例如下:

    Future<Object> future = poolExecutor.submit(needReturnValuTask);
            try {
                Object o = future.get();
            } catch (InterruptedException e) {
                //处理中断异常
            } catch (ExecutionException e) {
                //处理无法执行任务异常
    
            } finally {
                //关闭线程池
                poolExecutor.shutdown();
            }
    

关闭线程池

线程池类 ThreadPoolExecutor 提供了 shutdown()shutdownNow() 两个方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止,但两者还是有一定的区别。

  • shutdown()

    调用该方法后,先设置线程池的状态为 SHUTDOWN,线程池会继续执行中的任务,中止正在待待的任务,但不接受新的任务,如果有新的任务进入,会抛出 RejectedExecutionHandler 类型错误。

    当调用 shutdown() 方法,会立即返回,而不会阻塞等待先前提交的任务完成执行。若要等待任务完成执行可以使用 awaitTermination() 方法。

  • shutdownNow()

    调用该方法后,先设置线程池的状态为 STOP,线程池会尝试停止所有正在执行的任务,中止正在等待的任务,并返回正在等待执行的任务列表。从此方法返回后,将从任务队列中清空(删除)这些任务。

    当调用 shutdownNow() 方法后, 线程池会尽最大努力(best-effort) 尝试停止正在执行的任务,但是没有保证的。

这两个方法任意一个被调用,isShutdown() 方法返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminating() 方法返回 true

合理配置线程池

要合理配置线程池,需要对任务特性从以下几个角度进行分析

  • 性质:CPU密集性、IO密集性、混合型。
  • 优先级:高,中,低。
  • 执行时间:长,中,短。
  • 依赖性:是否依赖其他系统资源,如数据库连接池。

  • 性质

    性质不同的任务可以创建不同规模的线程池分开处理。

    CPU 密集型任务应配置尽可能小的线程,如配置 N(cpu) + 1 个线程的线程池。

    IO 密集型任务并不是一直在执行,则应配置尽可能多的线程,如 2 * N(cpu)

    混合性任务可以拆分成一个 CPU密集型任务和一个 IO密集型任务,只要这两个任务执行的时间相关不是太大,那么拆分后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相关太大,则没必要进行折分。

    可以通过 Runtime.getRuntime().availableProcessors() 来获取当前服务嘎嘎的 CPU 个数。

  • 优先级

    优先级不同的任务可以使用 优先级队列(PriorityBlockingQueue)来处理,让任务按优先顺序执行。

    注意:如果一直有优先级高的任务提交到队列,则低优先级的任务可能永远不能被执行。

  • 执行时间

    执行时间不同的任务可以交给不同规模的线程池来处理,或者使用优先级队列,让执行时间短的任务先执行。

  • 依赖性

    依赖外部资源返回结果的,等待时间越长,则 CPU 空闲时间越长,那么线程数应设置得越大,这样才能更好地利用 CPU。

建议使用 有界阻塞队列(ArrayBlockingQueue),能增加系统的稳定性和预警能力,可以根据需要设置大一点,如几千。特别注意因依赖外部资源(如数据库操作)响应缓慢,而导致工作线程全部阻塞,线程队列和线程池全满的问题。若设置 无界限阻塞队列,使用的是默认容量,线程队列则可能持续堆积耗尽内存,导致整个系统不可用。

线程池工作具类:Utils-线程池和线程数管理工具类

线程池的监控

系统中使用大量的线程池,有必要对线程池进行监控,方便在出现问题时可以快速定位。

可以使用线程池提供的参数属性进行监控。

  • taskCount:需要执行的总任务数。正要执行的任务数 + 队列中的任务数 + 已完成的任务数,是个近似值,因线程状态可能被动态改变。
  • comletedTaskCount:已完成的任务数,小于或等于 taskCount
  • largestPoolSize:曾经创建过的最大线程数。通过这个数据可以知道线程池是否有满过。
  • getPoolSize:获取当前线程池中的线程数。
  • getActiveCount:获取正在执行任务的线程数,是个近似值,因为线程状态是动态改变的。

线程池监控:可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute(Thread t, Runnable r),afterExecute(Runnable r, Throwable t),terminated() 方法,这三个方法都是 ThreadPoolExecutor 的空方法,可在子类中对其进行自定义。

  • beforeExecute(Thread t, Runnable r)

    在执行给定的线程的 Runnable 之前调用该方法。

    由执行任务的线程调用,可用于重新初始化 ThreadLocal 或执行日志记录。

    注意:若有多级子类重写,子类应该在这个方法的未尾调用 super.beforeExecute()

  • afterExecute(Runnable r, Throwable t)

    给定的 Runnable 执行完成时调用的方法。

    由执行任务的线程调用。如果不为 null,说明出现了未捕获的 RuntimeExceptionError 导致执行突然被终止。

    注意:如果将任务动作显式或通过诸如 Submit 之类的方法包含在任务(如 FutureTask)中,则这些任务对象会捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递给此方法。

    如果想使用此方法捕获两种类型的失败,可在子类中打印直接原因或潜在异常,如下示例:

    class ExtendedExecutor extends ThreadPoolExecutor {
        // ...
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t == null && r instanceof Future<?>) {
                try {
                    Object result = ((Future<?>) r).get();
                } catch (CancellationException ce) {
                    t = ce;
                } catch (ExecutionException ee) {
                    t = ee.getCause();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null)
                System.out.println(t);
        }
    }}
    
  • terminated()

    当线程池被关闭时调用。

    注意:若有多级子类重写,子类应该在此方法内调用 super.terminated()

其它参数

  1. Java线程池实现原理及其在美团业务中的实践
  2. 深入浅出Java线程池
  3. 多线程系列文章
  4. 深入理解Java线程池:ThreadPoolExecutor
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: