Redian新闻
>
并发编程 - FutureTask 解析

并发编程 - FutureTask 解析

公众号新闻

来源 | OSCHINA 社区

作者 | 京东云开发者-京东物流 丁冬

原文链接:https://my.oschina.net/u/4090830/blog/10091211

1、FutureTask 对象介绍

Future 对象大家都不陌生,是 JDK1.5 提供的接口,是用来以阻塞的方式获取线程异步执行完的结果。
在 Java 中想要通过线程执行一个任务,离不开 Runnable 与 Callable 这两个接口。
Runnable 与 Callable 的区别在于,Runnable 接口只有一个 run 方法,该方法用来执行逻辑,但是并没有返回值;而 Callable 的 call 方法,同样用来执行业务逻辑,但是是有一个返回值的。
Callable 执行任务过程中可以通过 FutureTask 获得任务的执行状态,并且可以在执行完成后通过 Future.get () 方式获取执行结果。
Future 是一个接口,而 FutureTask 就是 Future 的实现类。并且 FutureTask 实现了 RunnableFuture(Runnable + Future),说明我们可以创建一个 FutureTask 并直接把它放到线程池执行,然后获取 FutureTask 的执行结果。

2、FutureTask 源码解析

2.1 主要方法和属性

那么 FutureTask 是如何通过阻塞的方式来获取到异步线程执行的结果的呢?我们看下 FutureTask 中的属性。
// FutureTask的状态及其常量
privatevolatileint state;
privatestaticfinalint NEW =0;
privatestaticfinalint COMPLETING =1;
privatestaticfinalint NORMAL =2;
privatestaticfinalint EXCEPTIONAL =3;
privatestaticfinalint CANCELLED =4;
privatestaticfinalint INTERRUPTING =5;
privatestaticfinalint INTERRUPTED =6;

// callable对象,执行完后置空
privateCallable<V> callable;
// 要返回的结果或要引发的异常来自 get() 方法
privateObject outcome;// non-volatile, protected by state reads/writes
// 执行Callable的线程
privatevolatileThread runner;
// 等待线程的一个链表结构
privatevolatileWaitNode waiters;

FutureTask 中几个比较重要的方法。

// 取消任务的执行
booleancancel(boolean mayInterruptIfRunning);
// 返回任务是否已经被取消
booleanisCancelled();
// 返回任务是否已经完成,任务状态不为NEW即为完成
booleanisDone();
// 通过get方法获取任务的执行结果
Vget()throwsInterruptedException,ExecutionException;
// 通过get方法获取任务的执行结果,带有超时,如果超过给定时间则抛出异常
Vget(long timeout,TimeUnit unit)
throwsInterruptedException,ExecutionException,TimeoutException;

2.2 FutureTask 执行

当我们在线程池中执行一个 Callable 方法时,其实是将 Callable 任务封装成一个 RunnableFuture 对象去执行,同时将这个 RunnableFuture 对象返回,这样我们就拿到了 FutureTask 的引用,可以随时获取到任务执行的状态,并且可以在任务执行完成后通过该对象获取执行结果。
以下为 ThreadPoolExecutor 线程池提交一个 callable 方法的源码。
public<T> Future<T> submit(Callable<T> task){
if(task ==null)thrownewNullPointerException();
RunnableFuture<T> ftask =newTaskFor(task);
execute(ftask);
return ftask;
}

protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable){
returnnewFutureTask<T>(callable);
}

2.3 run 方法介绍

RunnableFuture 其实也是一个可以执行的 runnable,我们看下他的 run 方法。其主要流程就是执行 call 方法,正常执行完毕后将 result 结果赋值到 outcome 属性上。
publicvoidrun(){
if(state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null,Thread.currentThread()))
return;
try{
// 将callable赋值到本地变量
Callable<V> c = callable;
// 判断callable不为空并且FutureTask的状态必须为新创建
if(c !=null&& state == NEW){
V result;
boolean ran;
try{
// 执行call方法(用户自己实现的call逻辑),并获取到result结果
result
= c.call();
ran
=true;
}catch(Throwable ex){
result
=null;
ran
=false;
// 如果执行过程出现异常,则将异常对象赋值到outcome上
setException(ex);
}
// 如果正常执行完毕,则将result赋值到outcome属性上
if(ran)
set(result);
}
}finally{
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner
=null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if(s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

以下逻辑为正常执行完成后赋值的逻辑。

// 如果任务没有被取消,将future执行完的返回值赋值给result结果
// FutureTask任务的执行状态是通过CAS的方式进行赋值的,并且由此可知,COMPLETING其实是一个瞬时状态
// 当将线程执行结果赋值给outcome后,状态会修改为对应的NORMAL,即正常结束
protectedvoidset(V v){
if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
outcome
= v;
UNSAFE
.putOrderedInt(this, stateOffset, NORMAL);// final state
finishCompletion();
}
}

以下为执行异常时赋值逻辑,直接将 Throwable 对象赋值到 outcome 属性上。

protectedvoidsetException(Throwable t){
if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
outcome
= t;
UNSAFE
.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final state
finishCompletion();
}
}

无论是正常执行还是异常执行,最终都会调用一个 finishCompletion 方法,用来做工作的收尾工作。

2.4 get 方法介绍

Future 的 get 方法有两个重载的方法,一个是 get () 获取结果,一个是 get (long, TimeUnit) 带有超时时间的获取结果,我们看下 FutureTask 中的这两个方法是如何实现的。
// 不带有超时时间,一直阻塞直到获取结果
publicVget()throwsInterruptedException,ExecutionException{
int s = state;
if(s <= COMPLETING)
// 等待结果完成,带有超时的get方法也是调用的awaitDone方法
s
=awaitDone(false,0L);
// 返回结果
returnreport(s);
}

// 带有超时时间的获取结果,如果超过时间还没有获取到结果则抛出异常
publicVget(long timeout,TimeUnit unit)
throwsInterruptedException,ExecutionException,TimeoutException{
if(unit ==null)
thrownewNullPointerException();
int s = state;
// 如果任务未中断,调用awaitDone方法等待任务结果
if(s <= COMPLETING &&
(s =awaitDone(true, unit.toNanos(timeout)))<= COMPLETING)
thrownewTimeoutException();
// 返回结果
returnreport(s);
}

我们主要看下 awaitDone 方法的执行逻辑。此方法会通过 for 循环的方式一直阻塞等待任务执行完成。如果带有超时时间,则超过截止时间后会直接返回。

// timed:是否需要超时获取
// nanos:超时时间单位纳秒
privateintawaitDone(boolean timed,long nanos)
throwsInterruptedException{
finallong deadline = timed ?System.nanoTime()+ nanos :0L;
WaitNode q =null;
boolean queued =false;
// 此方法会一直for循环判断任务状态是否已经完成,是Future.get阻塞的原因
for(;;){
if(Thread.interrupted()){
removeWaiter(q);
thrownewInterruptedException();
}

int s = state;
// 任务状态大于COMPLETING,则表明任务结束,直接返回
if(s > COMPLETING){
if(q !=null)
q
.thread =null;
return s;
}
elseif(s == COMPLETING)// cannot time out yet
// Thread.yield() 方法,使当前线程由执行状态,变成为就绪状态,让出cpu时间,在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。
// COMPLETING状态为瞬时状态,任务执行完成,要么是正常结束,要么异常结束,后续会被置为NORMAL或者EXCEPTIONAL
Thread.yield();
elseif(q ==null)
// 每调用一次get方法,都会创建一个WaitNode等待节点
q
=newWaitNode();
elseif(!queued)
// 将该等待节点添加到链表结构waiters中,q.next = waiters 即在waiters的头部插入
queued
= UNSAFE.compareAndSwapObject(this, waitersOffset,
q
.next = waiters, q);
// 如果方法带有超时判断,则判断当前时间是否已经超过了截止时间,如果超过了及截止日期,则退出循环直接返回当前状态,此时任务状态一定是NEW
elseif(timed){
nanos
= deadline -System.nanoTime();
if(nanos <=0L){
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

我们在看下 report 方法,在调用 get 方法时是如何返回结果的。
这里首先获取 outcome 的值,并判断任务是否已经执行完成,如果执行完成,则将 outcome 对象强转成泛型指定的类型;如果任务被取消了,则抛出一个 CancellationException 异常;如果都不是,则说明任务在执行过程中发生了异常,此时任务状态位 EXCEPTIONAL,此时的 outcome 即为 Throwable 对象,所以将 outcome 强转为 Throwable 并抛出异常。
由此可以知道,我们将一个 FutureTask 任务 submit 到线程池中执行的时候,如果发生了异常,是会在调用 get 方法的时候抛出的。
privateVreport(int s)throwsExecutionException{
Object x = outcome;
if(s == NORMAL)
return(V)x;
if(s >= CANCELLED)
thrownewCancellationException();
thrownewExecutionException((Throwable)x);
}

2.5 cancel 方法介绍

cancel 方法用于取消正在运行的任务,如果任务取消成功,则返回 TRUE,如果取消失败则返回 FALSE。
// mayInterruptIfRunning:允许中断正在运行的任务
publicbooleancancel(boolean mayInterruptIfRunning){
// mayInterruptIfRunning如果为true则将状态置为INTERRUPTING,如果未false则将状态置为CANCELLED
if(!(state == NEW &&
UNSAFE
.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning
? INTERRUPTING : CANCELLED)))
returnfalse;
// 如果状态修改成功后,判断是否允许中断线程,如果允许,则调用Thread的interrupt方法中断
try{// in case call to interrupt throws exception
if(mayInterruptIfRunning){
try{
Thread t = runner;
if(t !=null)
t
.interrupt();
}finally{// final state
UNSAFE
.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
}finally{
// 取消后的收尾工作
finishCompletion();
}
returntrue;
}

2.6 isDone/isCancelled 方法介绍

isDone 方法用于判断 FutureTask 是否已经完成;isCancelled 方法用来判断 FutureTask 是否已经取消,这两个方法都是通过状态位来判断的。
publicbooleanisCancelled(){
return state >= CANCELLED;
}

publicbooleanisDone(){
return state != NEW;
}

2.7 finishCompletion 方法介绍

我们看下 finishCompletion 方法都做了哪些工作。
// 删除所有等待线程并发出信号,最后执行done方法
privatevoidfinishCompletion(){
// assert state > COMPLETING;
for(WaitNode q;(q = waiters)!=null;){
if(UNSAFE.compareAndSwapObject(this, waitersOffset, q,null)){
for(;;){
Thread t = q.thread;
if(t !=null){
q
.thread =null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if(next ==null)
break;
q
.next =null;// unlink to help gc
q
= next;
}
break;
}
}

done();

callable
=null;// to reduce footprint
}

我们看到 done 方法是一个受保护的空方法,此处没有任何逻辑,由其子类去根据自己的业务去实现相应的逻辑。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
protectedvoiddone(){}

3、总结

通过源码解读可以了解到 Future 的原理:
第一步:主线程将任务封装成一个 Callable 对象,通过 submit 方法提交到线程池去执行。
第二步:线程池执行任务的 run 方法,主线程则可以继续执行其他逻辑。
第三步:线程池中方法执行完成后将结果赋值到 outcome 属性上,并修改任务状态。
第四步:主线程在需要拿到异步任务结果的时候,主动调用 fugure.get () 方法来获取结果。
第五步:如果异步线程在执行过程中发生异常,则会在调用 future.get () 方法的时候抛出来。
以上就是对于 FutureTask 的分析,我们可以了解 FutureTask 任务执行的方式以及 Future.get 已阻塞的方式获取线程执行的结果原理,并且从代码中可以了解 FutureTask 的任务执行状态以及状态的变化过程。

 

END



批评红帽背后的利益群体是谁?



这里有最新开源资讯、软件更新、技术干货等内容

点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦


微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
JDK 21中的结构化并发:并发编程的一次飞跃【友情转发】对话合伙人(Face the Future)—中国新一代科技企业家创业机会及行业观点分享千秋岁 爱琴海词四首1折入!英国重奢𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢,专柜同款Polo衫,舒适、透气、高品质!一笔勾勒,宫崎骏动漫世界!斯坦福大模型𝘚𝘬𝘦𝘵𝘤𝘩-𝘢-𝘚𝘬𝘦𝘵𝘤𝘩,草图秒变神作How a Controversial Play Captured Aranya’s Cultural Divide​下一代Transformer:RetNet结构可视化及Vision RetNet展望1折入!穿过国际重奢𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢的男人,才会明白什么是品质!1篇Nature和两篇Nature子刊揭示血液因子PF4让大脑返老还童之谜𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢三防夹克,经典、优雅不凡的英伦风!还送长袖T恤!换季大捡漏!国际重奢𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢经典纯色T恤,买一送一!官方宣布FF 91 2.0 Futurist Alliance第二阶段交付时间推迟Zhongkao Fallout in Xi’an Over ‘Returning Students’Risk-Return Tradeoffs (I)China’s Youth Are Hooked on a New Outdoor Sport: Lure Fishing1折入!穿过国际重奢𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢的人,才是真正的有品!Nature 总结六大 ChatGPT 编程技巧:是非常强大的编程辅助工具!火了172年!国际重奢𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢来了,又好穿又有品!!【26日投票-市长Furey菲瑞】与交通拥堵抗争,保留Gardiner大道,停止自行车道建设 -市长候选人Furey安东尼•菲利天才少年稚晖君首秀:首款人形机器人亮相,上得厨房入得工厂,成本价不到20万元|Future国际重奢𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢中长款风衣,品位与优雅,兼顾帅气和女人味!【26日投票-市长Furey菲瑞】: 重夺我们的公园,公园是为儿童和家庭而设 !- 市长候选人Furey安东尼•菲利1折入!国际重奢𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢雅格狮丹三防夹克,下单送长袖T恤!沁园春 曙光【26日投票-市长Furey菲瑞】加派500名警员,提升交通安全-Furey是希望结束我们街头和TTC上随机暴力的多市居民捍卫者换季大捡漏!国际重奢𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢短袖买一送一!库存不多,手慢无!How Korea Quietly Reshaped Chinese Pop Culture再登哈林峰 - Ha Ling Peak法拉第未来交付首辆FF 91 2.0 Futurist Alliance忆秦娥:东篱偏隅,种植花卉【26日投票-市长Furey菲瑞】冲刺期间访问多伦多市的25个选区,呼吁中右翼团结。-市长候选人Furey安东尼•菲利【旅游】仅40欧/人!原价57欧!法国主题乐园Futuroscope门票!还有5欧的塞纳河游船票!Wing:人工智能时代的云开发编程语言沙漠中的動物舒適圈真没想到!国际重奢𝘼𝙦𝙪𝙖𝙨𝙘𝙪𝙩𝙪𝙢(雅格狮丹)风衣都让我们找来了(1折限时抢)
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。