问题描述

在讲述时间方案时,先来看一下JDK提供的默认线程池工具类Executors创建线程有哪些问题:

  1. 使用 Executors.newCachedThreadPool() 会为每个任务分配一个线程。如果任务量大,线程数量将迅速增加,可能导致 CPU 过载内存溢出(OOM)。
  2. 使用 Executors.newFixedThreadPool() 任务队列为无界队列 LinkedBlockingQueue,这意味着如果任务过多,队列会不断增长,占用内存,可能导致 内存泄漏。

工具类虽然用着方便,但是也有不少问题出现,主要在于我们没法自定义一些参数。同时我们也需要更加显示的声明线程池,不同类型的任务最好使用不同的线程池,保障任务不会出现互相干扰的情况下,同时也增加了系统的安全性,如果某一个线程池崩了不会影响整个系统。

为了解决上面这些问题,我们就需要自建线程池。

自建线程池

这里使用 Spring 提供的线程池 ThreadPoolTaskExecutor,相比于 JUC 包提供的 ThreadPoolExecutor 它提供类额外的配置,比如在线程池关闭时的一些操作,这点往后看。

下列代码基于 Spring 应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 线程池配置
* @author Ershi
* @date 2024/11/29
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer{

/**
* 项目通用线程池
*/
public static final String HICHAT_EXECUTOR = "hichatExecutor";

/**
* 指定@Async使用的线程池
* @return {@link Executor}
*/
@Override
public Executor getAsyncExecutor() {
return hichatExecutor();
}

/**
* 自定义项目通用线程池
* @return {@link ThreadPoolTaskExecutor}
*/
@Bean(HICHAT_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor hichatExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("hichat-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 满了调用线程执行,认为重要任务
executor.initialize();
return executor;
}
}

这段代码做了两件事:

  1. 自建一个线程池
  2. 实现 AsyncConfigurer 接口,指定 Spring 提供的 @Async 注解使用本线程池

关于创建线程池时的参数可以参考这篇文章:https://www.cnblogs.com/zhaoguanglu/p/15653078.html

使用线程池

通过上述自建线程池后,我们可以显示的声明 ThreadPoolTaskExecutor,通过 ThreadPoolTaskExecutor.execute(() -> {任务}) 来执行异步任务,或者直接通过在方法上打上 @Async 注解一键开启异步执行:

image-20241202190722157

@Async 会默认使用第一个配置的 ThreadPool,或者通过 @Primary 指定的首个线程池。有多个线程池时,也可以通过指定线程池名字来标记使用哪个线程池:

优雅停机

通常线程池中执行的都是一些比较重要的任务,希望在项目停机时,先执行完池中的任务再关闭。

使用 JUC 的 ThreadPoolExecutor 的话,我们需要为该 bean 设置 destory方法,并在销毁时调用shutdown(),该方法会平滑关闭线程池,不接受新任务,等待已提交的任务执行完成。

但如果使用 Spring 提供的 ThreadPoolTaskExecutor,连优雅停机这件事我们都可以交给 Spring 自动管理。通过查看源码可以发现,ThreadPoolTaskExecutor 继承 ExecutorConfigurationSupport,而其中有一个参数名为 waitForTasksToCompleteOnShutdown,他就是用来控制优雅停机的开关的。

我们只需要在创建线程池的时候制定好该参数开启,即可实现优雅停机:

1
executor.setWaitForTasksToCompleteOnShutdown(true); // 优雅停机 => spring自带,线程池关闭时,等待任务执行完毕再关闭

线程池统一异常处理

传统模式下,我们可以通过 try-catch 捕捉到异常进行处理,如下:、

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
Thread thread =new Thread(()->{
try {
log.info("111");
throw new RuntimeException("运行时异常了");
} catch (RuntimeException e) {
log.error("异常发生", e);
}
});
thread.start();
}

image-20241202190907112

其实还有更简单的方法,上面的方案需要我们为每一个任务都单独进行处理,虽然很灵活,但一般一个线程池中执行的都是同一类任务,我们可以对其设置统一的异常处理。

默认异常处理

我们先来看看 jvm 对线程发生异常的默认处理是什么样的。

JVM 会在线程出现异常时,调用 Thread 类中的 dispatchUncaughtException 方法。

其默认异常处理器实现是 ThreadGroup.Class,可以看到其中默认就是通过控制台输出。

image-20241202190916258

而我们要做的就是为线程池中的 Thread 设置一个自定义的异常处理器。要对线程池中产生的线程做处理,是不是就想到了 ThreadFactory(线程工厂)!

自定义异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 全局线程异常处理器
* @author Ershi
* @date 2024/11/30
*/
@Slf4j
public class GlobalThreadUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {

private static final GlobalThreadUncaughtExceptionHandler INSTANCE = new GlobalThreadUncaughtExceptionHandler();

/**
* 线程异常处理
* @param t
* @param e
*/
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("Exception in thread {}", t.getName(), e);
}

/**
* 获取单例
* @return {@link GlobalThreadUncaughtExceptionHandler}
*/
public static GlobalThreadUncaughtExceptionHandler getInstance() {
return INSTANCE;
}
}

自定义线程工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 自定义线程工厂
* @author Ershi
* @date 2024/11/30
*/
@AllArgsConstructor
public class MyThreadFactory implements ThreadFactory {


/**
* 基础线程工厂
*/
private ThreadFactory baseThreadFactory;

@Override
public Thread newThread(Runnable r) {
Thread thread = baseThreadFactory.newThread(r); // 执行基础线程工厂
// 执行自定义异常处理器
thread.setUncaughtExceptionHandler(GlobalThreadUncaughtExceptionHandler.getInstance());
return thread;
}
}

可以看到上述代码又引入了一个线程工厂,这是因为我们使用的是 Spring 提供的 ThreadPoolTaskExcuption 内部已经有自己的线程工厂了,并且做了一些操作:

image-20241202190924639

image-20241202190930221

我们如果要完全从头替换成自己的线程工厂,就需要将 Spring 做的操作全都 copy 一遍,很麻烦对吧。

那我们既要有之前的工厂内容,又要有我们自定义的工厂内容,最容易想到的就是 **装饰器模式 **了,我们可以通过 **组合/继承 **在现有的工厂基础上扩展功能。上述代码就是采用组合的方式。

下一步就是找到 Spring 使用的线程工厂是哪个了,将其作为基础线程工厂。通过翻找继承关系可以看到,其线程工厂就是本身:

image-20241202190937445

那只需要在配置线程池的时候加上这样一段代码即可,传入线程池本身:

1
executor.setThreadFactory(new MyThreadFactory(executor)); // 创建自定义线程工厂,以该spring线程工厂作为基础工厂