2

Java线程池进阶

 2 years ago
source link: https://lesofn.com/archives/threadpool-advance
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Java线程池进阶

木小丰 2022年02月26日 933次浏览

线程池是日常开发中常用的技术,使用也非常简单,不过想使用好线程池也不是件容易的事,开发者需要不断探索底层的实现原理,才能在不同的场景中选择合适的策略,最大程度发挥线程池的作用以及避免踩坑。

一、线程池工作流程

以下是Java线程池的工作流程,涉及创建线程的参数及拒绝策略,如果读者对这部分内容不太了解,可参考其他的文档,本文不在赘述。

二、线程池进阶

1、线程池的创建

需要手动通过ThreadPoolExecutor创建,使用者要非常明确业务场景并定制线程池,避免误用可能导致的问题。

以下是阿里巴巴Java开发手册中的描述:

ThreadFactory:推荐使用guava中的ThreadFactoryBuilder创建:

new ThreadFactoryBuilder().setNameFormat("name-%d").build();

2、阻塞队列在线程池中的使用

很多同学一看到阻塞队列就自然的认为出入队列都是阻塞的,使用的阻塞队列也就没必要关心拒绝策略了,其实不然,阻塞队列在任务提交和任务获取阶段使用了不同的策略。

任务提交阶段:调用的阻塞队列的offer方法,这个方法是非阻塞的,如果插入队列失败会直接返回false,并触发拒绝策略;

获取任务阶段:使用的是take方法,此方法是阻塞的;

3、保证提交阶段任务不丢失

有三种方法:使用CallerRunsPolicy拒绝策略、自定义拒绝策略、使用MQ系统保证任务不丢失。

(1)CallerRunsPolicy拒绝策略

ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程处理

这种是最简单的策略,但需要注意的是如果任务耗时较长,会阻塞提交任务的线程,可能会成为系统瓶颈。

(2)自定义拒绝策略

既然Java线程默认使用的是offer提交任务,那我们可以自定义拒绝策略在任务提交失败时改为put阻塞提交。

缺点也是会阻塞提交线程,不过相比CallerRunsPolicy策略更能发挥多线程的优势。

RejectedExecutionHandler executionHandler = (r, executor) -> {​ executor.getQueue().put(r); } catch (InterruptedException e) {​ Thread.currentThread().interrupt();​ throw new RejectedExecutionException("Producer thread interrupted", e);

(3)配合MQ保证任务不丢失

使用默认的ThreadPoolExecutor.AbortPolicy策略,如果抛出RejectedExecutionException异常则返回给MQ消费失败,MQ会保证自动重试。

4、保证队列、未执行完成的任务不丢失

当服务停止的时候,线程池中队列和活跃线程中未执行完成的任务可能会造成数据丢失,首先说下结论:无论采取任何策略,在Java层都不能100%保证不丢,比如机器突然断电的情况。我们还是可以采取一定的措施尽量避免任务丢失。

(1)线程池关闭

线程池关闭有两个方法:

shutdownNow方法:线程池拒绝接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行,并抛出InterruptedException异常。

shutdown方法:线程池拒绝接收新提交的任务,同时等待线程池里的任务执行完毕后关闭线程池。

(2)注册关闭钩子

使用以下方法注册JVM进程关闭钩子,在钩子方法中执行线程池关闭、未处理完成的任务持久化保存等。

Runtime.getRuntime().addShutdownHook()

需要注意的是:钩子方法在使用kill -9杀死进程时不会执行,一般的杀进程的方式是先执行kill,等待一段时间,如果进程还没杀死,再执行kill -9。

要保证队列中的任务不丢失,需要消费队列中的数据,发送到外部MQ中;

保证未执行完成的任务不丢失,需要在抛出InterruptedException异常后,将任务参数保证到MQ中;

需要注意的是:1)尽量不要把未完成的任务保存到本地磁盘,尤其是在经常扩缩容的弹性集群里;2)捕获InterruptedException异常后,不要做重试等耗时操作;3)需要监控任务都发送到MQ中的时间,以便调整kill -9强制执行前的等待时间。

(3)使用MQ保证任务必须执行完成

通过上面介绍的两种方式,可以处理大部分正常停止服务丢数据的任务。不过对于极端情况下,比如断电、断网等,需要严格保证任务不丢失的场景还是不能满足业务需要,这种情况下就需要依赖MQ。

方案是使用线程池的submit方法提交任务,通过future获取到任务执行完成再返回给MQ消费完成。在MQ中如何保证数据不丢失是另外一个复杂的话题了,这里不再深入探讨。

需要注意的是,如果采用这种方案,需要保证处理任务的幂等性,在操作步骤比较多的时候,复杂性也会很高。

5、ThreadLocal变量

ThreadLocal中变量的作用域是当前线程,使用线程池后会因跨线程导致数据不能传递,如果业务中使用了ThreadLocal,需要额外处理这种场景。

(1)InheritableThreadLocal

InheritableThreadLocal是在父子线程中自动传递参数,在线程池场景中不适用。

(2)手动处理

在提交任务前把ThreadLocal中的值取出来,在线程池执行时再set到线程池中线程的ThreadLocal中,并且在finally中清理数据。

缺点是每个线程池都要处理一遍,如果对上下文不熟悉,有漏传的风险。

(3)TransmittableThreadLocal

阿里开源地址:TransmittableThreadLocal

原理是通过javaagent自动处理ThreadLocal跨线程池传参,对业务开发者无感知,也是推荐的方案。

6、异常处理

(1)异常感知

execute方法:抛异常会被提交任务线程感知;

submit方法:抛异常不会被提交任务线程感知,在Future.get()执行时会被感知;

(2)统一处理方案1:异步任务里统一catch

在线程池的执行逻辑最外层,包装try、catch,处理所有异常。

缺点是: 1)所有的不同任务都要trycatch,增加了代码量。2)不存在checkedexception的地方也需要都trycatch起来,代码丑陋。

(3)统一处理方案2:覆写统一异常处理方法

此方案有两种常用实现:1)自定义线程池,继承ThreadPoolExecutor并覆写其afterExecute方法;2)创建线程池时自定义ThreadFactory,在实现里手动创建线程池,并调用Thread.setUncaughtExceptionHandler注册统一异常处理器。

(4)统一处理方案3:Future

任务提交都使用submit,并在Future.get()时捕获所有异常。

本文从创建线程池、队列注意事项、如何保证任务不丢失、ThreadLocal、异常等方面总结了笔者的一些思考,各位读者可以对照下自己的使用场景,看本文提到的问题是否都考虑到了呢,或者你还有什么线程池方面的使用经验,欢迎交流分享。

本文链接:Java线程池进阶

作者简介:木小丰,美团Java技术专家,专注分享软件研发实践、架构思考。欢迎关注公共号:Java研发

更多精彩文章:
从MVC到DDD的架构演进
平台化建设思路浅谈
构建可回滚的应用及上线checklist实践
Maven依赖冲突问题排查经验


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK