并发编程 - FutureTask 解析
来源 | OSCHINA 社区
作者 | 京东云开发者-京东物流 丁冬
原文链接:https://my.oschina.net/u/4090830/blog/10091211
1、FutureTask 对象介绍
2、FutureTask 源码解析
2.1 主要方法和属性
// FutureTask的状态及其常量
privatevolatileint state;
privatestaticfinalint NEW =0;
privatestaticfinalint COMPLETING =1;
privatestaticfinalint NORMAL =2;
privatestaticfinalint EXCEPTIONAL =3;
privatestaticfinalint CANCELLED =4;
privatestaticfinalint INTERRUPTING =5;
privatestaticfinalint INTERRUPTED =6;
// callable对象,执行完后置空
privateCallable<V> callable;
// 要返回的结果或要引发的异常来自 get() 方法
privateObject outcome;// non-volatile, protected by state reads/writes
// 执行Callable的线程
privatevolatileThread runner;
// 等待线程的一个链表结构
privatevolatileWaitNode waiters;
FutureTask 中几个比较重要的方法。
// 取消任务的执行
booleancancel(boolean mayInterruptIfRunning);
// 返回任务是否已经被取消
booleanisCancelled();
// 返回任务是否已经完成,任务状态不为NEW即为完成
booleanisDone();
// 通过get方法获取任务的执行结果
Vget()throwsInterruptedException,ExecutionException;
// 通过get方法获取任务的执行结果,带有超时,如果超过给定时间则抛出异常
Vget(long timeout,TimeUnit unit)
throwsInterruptedException,ExecutionException,TimeoutException;
2.2 FutureTask 执行
public<T> Future<T> submit(Callable<T> task){
if(task ==null)thrownewNullPointerException();
RunnableFuture<T> ftask =newTaskFor(task);
execute(ftask);
return ftask;
}
protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable){
returnnewFutureTask<T>(callable);
}
2.3 run 方法介绍
publicvoidrun(){
if(state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null,Thread.currentThread()))
return;
try{
// 将callable赋值到本地变量
Callable<V> c = callable;
// 判断callable不为空并且FutureTask的状态必须为新创建
if(c !=null&& state == NEW){
V result;
boolean ran;
try{
// 执行call方法(用户自己实现的call逻辑),并获取到result结果
result = c.call();
ran =true;
}catch(Throwable ex){
result =null;
ran =false;
// 如果执行过程出现异常,则将异常对象赋值到outcome上
setException(ex);
}
// 如果正常执行完毕,则将result赋值到outcome属性上
if(ran)
set(result);
}
}finally{
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner =null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if(s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
以下逻辑为正常执行完成后赋值的逻辑。
// 如果任务没有被取消,将future执行完的返回值赋值给result结果
// FutureTask任务的执行状态是通过CAS的方式进行赋值的,并且由此可知,COMPLETING其实是一个瞬时状态
// 当将线程执行结果赋值给outcome后,状态会修改为对应的NORMAL,即正常结束
protectedvoidset(V v){
if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final state
finishCompletion();
}
}
以下为执行异常时赋值逻辑,直接将 Throwable 对象赋值到 outcome 属性上。
protectedvoidsetException(Throwable t){
if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final state
finishCompletion();
}
}
2.4 get 方法介绍
// 不带有超时时间,一直阻塞直到获取结果
publicVget()throwsInterruptedException,ExecutionException{
int s = state;
if(s <= COMPLETING)
// 等待结果完成,带有超时的get方法也是调用的awaitDone方法
s =awaitDone(false,0L);
// 返回结果
returnreport(s);
}
// 带有超时时间的获取结果,如果超过时间还没有获取到结果则抛出异常
publicVget(long timeout,TimeUnit unit)
throwsInterruptedException,ExecutionException,TimeoutException{
if(unit ==null)
thrownewNullPointerException();
int s = state;
// 如果任务未中断,调用awaitDone方法等待任务结果
if(s <= COMPLETING &&
(s =awaitDone(true, unit.toNanos(timeout)))<= COMPLETING)
thrownewTimeoutException();
// 返回结果
returnreport(s);
}
我们主要看下 awaitDone 方法的执行逻辑。此方法会通过 for 循环的方式一直阻塞等待任务执行完成。如果带有超时时间,则超过截止时间后会直接返回。
// timed:是否需要超时获取
// nanos:超时时间单位纳秒
privateintawaitDone(boolean timed,long nanos)
throwsInterruptedException{
finallong deadline = timed ?System.nanoTime()+ nanos :0L;
WaitNode q =null;
boolean queued =false;
// 此方法会一直for循环判断任务状态是否已经完成,是Future.get阻塞的原因
for(;;){
if(Thread.interrupted()){
removeWaiter(q);
thrownewInterruptedException();
}
int s = state;
// 任务状态大于COMPLETING,则表明任务结束,直接返回
if(s > COMPLETING){
if(q !=null)
q.thread =null;
return s;
}
elseif(s == COMPLETING)// cannot time out yet
// Thread.yield() 方法,使当前线程由执行状态,变成为就绪状态,让出cpu时间,在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。
// COMPLETING状态为瞬时状态,任务执行完成,要么是正常结束,要么异常结束,后续会被置为NORMAL或者EXCEPTIONAL
Thread.yield();
elseif(q ==null)
// 每调用一次get方法,都会创建一个WaitNode等待节点
q =newWaitNode();
elseif(!queued)
// 将该等待节点添加到链表结构waiters中,q.next = waiters 即在waiters的头部插入
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果方法带有超时判断,则判断当前时间是否已经超过了截止时间,如果超过了及截止日期,则退出循环直接返回当前状态,此时任务状态一定是NEW
elseif(timed){
nanos = deadline -System.nanoTime();
if(nanos <=0L){
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
privateVreport(int s)throwsExecutionException{
Object x = outcome;
if(s == NORMAL)
return(V)x;
if(s >= CANCELLED)
thrownewCancellationException();
thrownewExecutionException((Throwable)x);
}
2.5 cancel 方法介绍
// mayInterruptIfRunning:允许中断正在运行的任务
publicbooleancancel(boolean mayInterruptIfRunning){
// mayInterruptIfRunning如果为true则将状态置为INTERRUPTING,如果未false则将状态置为CANCELLED
if(!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
returnfalse;
// 如果状态修改成功后,判断是否允许中断线程,如果允许,则调用Thread的interrupt方法中断
try{// in case call to interrupt throws exception
if(mayInterruptIfRunning){
try{
Thread t = runner;
if(t !=null)
t.interrupt();
}finally{// final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
}finally{
// 取消后的收尾工作
finishCompletion();
}
returntrue;
}
2.6 isDone/isCancelled 方法介绍
publicbooleanisCancelled(){
return state >= CANCELLED;
}
publicbooleanisDone(){
return state != NEW;
}
2.7 finishCompletion 方法介绍
// 删除所有等待线程并发出信号,最后执行done方法
privatevoidfinishCompletion(){
// assert state > COMPLETING;
for(WaitNode q;(q = waiters)!=null;){
if(UNSAFE.compareAndSwapObject(this, waitersOffset, q,null)){
for(;;){
Thread t = q.thread;
if(t !=null){
q.thread =null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if(next ==null)
break;
q.next =null;// unlink to help gc
q = next;
}
break;
}
}
done();
callable =null;// to reduce footprint
}
protectedvoiddone(){}
3、总结
END
点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦
微信扫码关注该文公众号作者
戳这里提交新闻线索和高质量文章给我们。
来源: qq
点击查看作者最近其他文章