private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
state 属性是整个 FutureTask 的核心属性,它代表了任务在运行过程中的状态,随着任务的执行,状态将不断地进行转变;任务的起始状态都是 NEW,中间状态为 COMPLETING / INTERRUPTING,其他为终止状态;任务的中间状态是一个瞬态,它非常的短暂,而且任务的中间态并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果,所以只要state不处于 NEW 状态,就说明任务已经执行完毕。
FutureTask 中的队列
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
// FutureTask 的获取结果的方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
// FutureTask 的初始状态是 NEW,如果任务的状态还处在 NEW 或者 COMPLETING 的状态
// 就等待结果
if (s <= COMPLETING)
// 等待任务处理完成
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// deadline 用来控制超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 调用 get 时,任务可能还没结束,q 就是表示一个等待任务的线程节点,可能会被
// 加入到 Treiber 栈中
WaitNode q = null;
// 是否已经入队列
boolean queued = false;
// 自旋
for (;;) {
// 这里是为了响应调用任务的线程的中断
if (Thread.interrupted()) {
// 如果触发了中断,如果当前节点已经加入到 treiber stack 中的话,就删除掉
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 当前如果已经到了终态,就返回结果
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 如果任务状态正在设置中的话,说明任务已经执行完成了,这时理论上会非常块到达终态
// 此时调用 yield 是获取任务结果的线程主动让出 CPU,然后后续再竞争 CPU,以期任务
// 线程已经将结果处理完成
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 获取到 CPU 后,继续从这里开始执行
else if (q == null)
// 说明 q 既没有入队,而且还在等待任务结果,而且是第一次进到这里
// 就创建一个新的等待节点,节点里的 thread 是当前等待任务结果的线程
// 然后通过自旋进入到下一次循环和判断逻辑里
q = new WaitNode();
else if (!queued)
// 到这里说明已经初始化了 node,但是还没有入队
// 执行入队的动作,这里使用了 CAS 操作, 意思是
// 将 q 的 next 指向原来的头,把新的头指向新节点 q
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// 等待任务的线程是否设置了超时时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 如果没有设置超时线程,就一直阻塞,直到任务完成后,调用任务线程调用 unpark
LockSupport.park(this);
}
}
finishCompletion
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
run()
// 一般是由线程池中的线程来执行
public void run() {
// 任务在线程池中执行时,状态必须是 NEW
// 如果存在多线程竞争执行一个任务,只有竞争成功的才可以去执行任务,这里是通过 CAS 操作实现的
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}