Java线程池的实现原理及其在业务中的最佳实践
阿里妹导读
一、线程池简介
1.什么是线程池?
2.线程池有什么好处?
减少线程创建和销毁的开销,线程的创建和销毁需要消耗系统资源,线程池通过复用线程,避免了对资源的频繁操作,从而提高系统性能;
控制和优化系统资源利用,线程池通过控制线程的数量,可以尽可能地压榨机器性能,提高系统资源利用率;
提高响应速度,线程池可以预先创建线程且通过多线程并发处理任务,提升任务的响应速度及系统的并发性能;
二、Java线程池的实现原理
1.类继承关系
execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理
submit(Runnable r):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是null
submit(Runnable r,Object result):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是传入的第二个参数result
shutdown():关闭线程池,不接受新任务,但是等待队列中的任务处理完毕才能真正关闭
shutdownNow():立即关闭线程池,不接受新任务,也不再处理等待队列中的任务,同时中断正在执行的线程
setCorePoolSize(int corePoolSize):设置核心线程数
setKeepAliveTime(long time, TimeUnit unit):设置线程的空闲时间
setMaximumPoolSize(int maximumPoolSize):设置最大线程数
setRejectedExecutionHandler(RejectedExecutionHandler rh):设置拒绝策略
setThreadFactory(ThreadFactory tf):设置线程工厂
beforeExecute(Thread t, Runnable r):任务执行之前的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑
afterExecute(Runnable r, Throwable t):任务执行之后的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑
2.线程池的状态
RUNNING:线程池一旦被创建,就处于RUNNING状态,任务数为0,能够接收新任务,对已排队的任务进行处理。
SHUTDOWN:不接收新任务,但能处理已排队的任务。当调用线程池的shutdown()方法时,线程池会由RUNNING转变为SHUTDOWN状态。
STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。当调用线程池的shutdownNow()方法时,线程池会由RUNNING或SHUTDOWN转变为STOP状态。
TIDYING:当线程池在SHUTDOWN状态下,任务队列为空且执行中任务为空,或者线程池在STOP状态下,线程池中执行中任务为空时,线程池会变为TIDYING状态,会执行terminated()方法。这个方法在线程池中是空实现,可以重写该方法进行相应的处理。
TERMINATED:线程池彻底终止。线程池在TIDYING状态执行完terminated()方法后,就会由TIDYING转变为TERMINATED状态。
3.线程池的执行流程
4.问题思考
线程池的核心线程可以回收吗?
线程池在提交任务前,可以提前创建线程吗?
三、源码分析
1.execute(Runnable command)
2.addWorker(Runnable firstTask, boolean core)
线程创建成功并添加到线程池后,会调用start()方法,启动线程,执行任务。
3.runWorker(Worker w)
4.getTask()
根据是否需要超时控制,提供两个阻塞方法获取阻塞队列中的任务。
5.processWorkerExit(w, completedAbruptly)
四、线程池在业务中的最佳实践
1.如何选择合适的线程池参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1.corePoolSize: 核心线程数
2.maximumPoolSize: 最大线程数
3.keepAliveTime: 线程的空闲时间
4.unit: 空闲时间的单位(秒、分、小时等等)
5.workQueue: 等待队列
6.threadFactory: 线程工厂
7.handler: 拒绝策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程直接处理该任务(可能为主线程Main),保证每个任务执行完毕
如何选择合适的线程池参数?
2.如何正确地创建线程池对象
FixedThreadPool:具有固定线程数量的线程池,无界阻塞队列;
CachedThreadPool:线程数量可以动态伸缩的线程池,最大线程数为Integer.MAX_VALUE
SingleThreadPool:单个线程的线程,核心线程数和最大线程数都是1,无界阻塞队列
public class TestThreadPool {
/**
* 线程池
*/
private static ExecutorService executor = initDefaultExecutor();
/**
* 统一的获取线程池对象方法
*/
public static ExecutorService getExecutor() {
return executor;
}
private static final int DEFAULT_THREAD_SIZE = 16;
private static final int DEFAULT_QUEUE_SIZE = 10240;
private static ExecutorService initDefaultExecutor() {
return new ThreadPoolExecutor(DEFAULT_THREAD_SIZE, DEFAULT_THREAD_SIZE,
300, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE),
new DefaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
局部变量定义的线程池对象在方法结束后可以被垃圾回收吗?
public static void main(String[] args) {
test1();
test2();
}
public static void test1(){
Object obj = new Object();
System.out.println("方法一执行完成");
}
public static void test2(){
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("方法二执行完成");
}
});
}
问题分析与解答
虚拟机栈(栈帧中的本地变量表)中引用的对象;
方法区中的类静态属性引用的对象;
方法区中常量引用的对象;
本地方法栈中JNI(即一般说的Native方法)引用的对象;
正在运行的线程;
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
public class Outer {
private String name;
private Inner inner;
public int outerMethod() {
return 1;
}
/**
* 非静态内部类
*/
class Inner {
private void innerMethod() {
//在非静态内部类中可以直接调用外部类的方法
outerMethod();
}
private String address;
}
}
class Outer$Inner {
private String address;
Outer$Inner(Outer var1) {
this.this$0 = var1;
}
private void innerMethod() {
this.this$0.outerMethod();
}
}
public class Outer {
private String name;
private Inner inner;
public int outerMethod() {
return 1;
}
/**
* 静态内部类
*/
static class Inner {
private String address;
}
}
class Outer$Inner {
private String address;
Outer$Inner() {
}
}
这个问题带来两个启发:
3.相互依赖的子任务避免使用同一线程池
public class FartherAndSonTask {
public static ExecutorService executor= TestThreadPool.getExecutor();
public static void main(String[] args) throws Exception {
FatherTask fatherTask = new FatherTask();
Future<String> future = executor.submit(fatherTask);
future.get();
}
/**
* 父任务,里面异步执行子任务
*/
static class FatherTask implements Callable<String> {
public String call() throws Exception {
System.out.println("开始执行父任务");
SonTask sonTask = new SonTask();
Future<String> future = executor.submit(sonTask);
String s = future.get();
System.out.println("父任务已拿到子任务执行结果");
return s;
}
}
/**
* 子任务
*/
static class SonTask implements Callable<String> {
public String call() throws Exception {
//处理一些业务逻辑
System.out.println("子任务执行完成");
return null;
}
}
}
使用不同的线程池隔离有相互依赖的任务;
调用future.get()方法设置超时时间,这样做可以避免线程阻塞,但是依然会出现大量的超时异常;
4.合理选择submit()和execute()方法
execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理,轻量级方法,适用于处理不需要返回结果的任务;
submit(Runnable r):返回值为Future类型,future可以用来检查任务是否已经完成,获取任务的结果等,适用于需要处理返回结果的任务;
private void asyncSupplyPriceSync(List<Long> shidList, SupplyPriceSyncMsg msg) {
if (CollectionUtils.isEmpty(shidList)) {
return;
}
PlatformLogUtil.logInfo("异步推送酒店报价信息供给总数:", shidList.size());
final Map<String, Future >> futures = Maps.newLinkedHashMap();
//分批提交线程池处理
Lists.partition(shidList, SwitchConfig.HOTEL_PRICE_ASYNC_LIST_SIZE)
.forEach(subList -> {
try {
futures.put(UUID.randomUUID().toString(), executorService
.submit(() -> batchSupplyPriceSync(subList, msg)));
} catch (Exception e) {
PlatformLogUtil.logFail("异步推送报价信息线程池子任务执行异常", LogListUtil.newArrayList(subList), e);
}
});
//阻塞,等所有子任务都处理完,才返回结果
futures.forEach((uuid, future) -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
PlatformLogUtil.logFail("异步推送报价信息获取子任务执行结果异常", LogListUtil.newArrayList(e));
}
});
}
5.请捕获线程池中子任务的代码异常
public class ExceptionTest {
public static ExecutorService executor = TestThreadPool.getExecutor();
public static void main(String[] args) {
executor.execute(() -> test("正常"));
executor.execute(() -> test("正常"));
executor.execute(() -> test("任务执行异常"));
executor.execute(() -> test("正常"));
executor.shutdown();
}
public static void test(String str) {
String result = "当前ThreadName为" + Thread.currentThread().getName() + ":结果" + str;
if (str.equals("任务执行异常")) {
throw new RuntimeException(result + "****执行异常");
} else {
System.out.println(result);
}
}
}
如果线程池中执行任务的线程异常,发生异常的线程会销毁吗?其他任务还能正常执行吗?
可以发现
在processWorkerExit(w, completedAbruptly)方法内,可以看到如果运行中的线程池有线程执行异常,会调用workers.remove()移除当前线程,并调用addWorker()重新创建新的线程。
所以在任务3销毁线程再重新创建线程,和任务4创建线程这两个动作会有时序问题,具体看下图:
那么控制打印的异常信息是怎么来的呢?
所以,在业务代码中,请捕获子任务中的异常,否则会导致线程池中的工作线程频繁销毁、创建,造成资源浪费,违背了线程复用的设计原则。
微信扫码关注该文公众号作者