状态属性
Possible state transitions:
xxxxxxxxxx
// 当前task状态
private volatile int state;
// 任务尚未执行
private static final int NEW = 0;
// 任务正在结束,但未完成
private static final int COMPLETING = 1;
// 任务正常结束
private static final int NORMAL = 2;
// 任务执行中发生异常
private static final int EXCEPTIONAL = 3;
// 当前任务被取消
private static final int CANCELLED = 4;
// 当前任务中断中
private static final int INTERRUPTING = 5;
// 当前任务已中断
private static final int INTERRUPTED = 6;
其他属性
xxxxxxxxxx
// runnable使用适配器模式伪装成callable
private Callable<V> callable;
// 执行结果或抛出的异常
private Object outcome;
// 保存执行任务的线程对象引用
private volatile Thread runner;
// 可能有多个线程get任务结果,所以使用了一种栈结构
private volatile WaitNode waiters;
注意到线程池调用的submit
重载方法有callable
和runnable
两种,解释了FutureTask
为什么要有两种不同的构造方法
xxxxxxxxxx
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
两种构造方法一种是callable
,另一种是runnable
xxxxxxxxxx
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
// callable是程序员自己实现的业务类
this.callable = callable;
// 设置当前任务状态为NEW
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
跟入Executors.callable
方法
xxxxxxxxxx
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
// 适配器模式将runnable转为callable
return new RunnableAdapter<T>(task, result);
}
// 继续跟入
// 实现了Callable接口
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
// 注意到并没有返回真正的执行结果
return result;
}
}
submut
方法通过newTaskFor
方法创建FutureTask
对象,通过execute
方法将FutureTask
对象真正提交到线程池中(参考上文代码)
线程池的执行入口是FutureTask
对象的run
xxxxxxxxxx
public void run() {
// 条件一:task已被执行
// 条件二:CAS失败(当前任务已被其他线程抢占)
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
// NEW状态且抢占成功
try {
Callable<V> c = callable;
// 防止空指针和外部线程cancel了当前任务
if (c != null && state == NEW) {
// 执行结果的引用
V result;
// true:callable.run代码执行成功无异常
// false:callable.run执行失败有异常
boolean ran;
try {
// 调用程序员实现的callable
// 调用被装饰后的runnable
result = c.call();
// call无异常
ran = true;
} catch (Throwable ex) {
// 执行异常
result = null;
ran = false;
// 设置异常
setException(ex);
}
// 设置执行结果
if (ran)
set(result);
}
} finally {
// 执行任务的线程清空
runner = null;
int s = state;
if (s >= INTERRUPTING)
// 后续分析
handlePossibleCancellationInterrupt(s);
}
}
set
和setException
方法类似
xxxxxxxxxx
protected void set(V v) {
// 使用CAS设置任务完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 执行结果赋值
outcome = v;
// 设置任务状态为正常结束(直接设置内存)
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}
protected void setException(Throwable t) {
// 使用CAS设置任务完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 这里的t是抛出的异常
outcome = t;
// 设置任务状态为异常(直接设置内存)
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
xxxxxxxxxx
// 多个线程等待当前任务执行结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 状态还未结束
if (s <= COMPLETING)
// 实现阻塞(核心)
s = awaitDone(false, 0L);
return report(s);
}
核心方法awaitDone
xxxxxxxxxx
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前引用线程封装为wait node
WaitNode q = null;
// 表示当前线程没有入队
boolean queued = false;
for (;;) {
// 当前线程是否被其他线程使用中断方式唤醒
// 只有第一次会返回true
if (Thread.interrupted()) {
// 当前线程从wait node出队(下文分析)
removeWaiter(q);
// 终端异常
throw new InterruptedException();
}
int s = state;
// 当前线程被unpark方式唤醒
// 如果最新状态是已经得到结果
if (s > COMPLETING) {
// 判断是否为当前线程创建过wait node
if (q != null)
// GC
q.thread = null;
return s;
}
// 正在完成中
else if (s == COMPLETING)
// 释放CPU进行下一次抢占
Thread.yield();
// 第一次循环:创建wait node对象
else if (q == null)
q = new WaitNode();
// 第二次循环:已创建wait node对象但还未入队
else if (!queued)
// waiters执行队列头
// CAS方式入队
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 第三次自旋:
else if (timed) {
// 处理有超时的情况
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 当前get操作的线程变为waiting状态
// 除非有其他线程唤醒或中断
LockSupport.park(this);
}
}
跟入removeWaiter
方法
xxxxxxxxxx
private void removeWaiter(WaitNode node) {
if (node != null) {
// 引用线程GC
node.thread = null;
retry:
for (;;) {
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 下一个节点是否为空
if (q.thread != null)
// 前移
pred = q;
// 当前节点是空,也就是不为头节点
else if (pred != null) {
// 移除当前节点
pred.next = s;
if (pred.thread == null)
continue retry;
}
// CAS方式直接指向下一个节点
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
get最后的report
方法
xxxxxxxxxx
private V report(int s) throws ExecutionException {
// 执行结果或异常
Object x = outcome;
if (s == NORMAL)
// 正常结果直接返回
return (V)x;
if (s >= CANCELLED)
// 用户取消则抛出取消异常
throw new CancellationException();
// 抛出执行异常
throw new ExecutionException((Throwable)x);
}
在分析到run
的set
方法中,有finishCompletion
方法未分析
xxxxxxxxxx
private void finishCompletion() {
// 从头遍历队列
for (WaitNode q; (q = waiters) != null;) {
// CAS置空防其他线程操作
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 当前wait node封装的线程
Thread t = q.thread;
if (t != null) {
// GC
q.thread = null;
// 唤醒当前线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
// 当前处理完最后一个节点
break;
// GC
q.next = null;
// 遍历队列
q = next;
}
break;
}
}
// done方法为空
done();
// GC
callable = null;
}
用户如果主动调用,可以取消当前任务
xxxxxxxxxx
public boolean cancel(boolean mayInterruptIfRunning) {
// 任务正在运行中尝试用CAS方式中断或取消
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// 取反又返回false有点绕
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
// 当前线程不为空
if (t != null)
// 中断当前线程
t.interrupt();
} finally {
// 终端完成设置已中断
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒所有get阻塞的线程
finishCompletion();
}
return true;
}