状态属性
Possible state transitions:
xxxxxxxxxx // 当前task状态 private volatile int state; // 任务尚未执行 private static final int NEW = 0; // 任务正在结束,但未完成 private static final int COMPLETING = 1; // 任务正常结束 private static final int NORMAL = 2; // 任务执行中发生异常 private static final int EXCEPTIONAL = 3; // 当前任务被取消 private static final int CANCELLED = 4; // 当前任务中断中 private static final int INTERRUPTING = 5; // 当前任务已中断 private static final int INTERRUPTED = 6;其他属性
xxxxxxxxxx// runnable使用适配器模式伪装成callableprivate Callable<V> callable;// 执行结果或抛出的异常private Object outcome;// 保存执行任务的线程对象引用private volatile Thread runner;// 可能有多个线程get任务结果,所以使用了一种栈结构private volatile WaitNode waiters;注意到线程池调用的submit重载方法有callable和runnable两种,解释了FutureTask为什么要有两种不同的构造方法
xxxxxxxxxxpublic Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;}
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}两种构造方法一种是callable,另一种是runnable
xxxxxxxxxxpublic FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); // callable是程序员自己实现的业务类 this.callable = callable; // 设置当前任务状态为NEW this.state = NEW;}
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW;}跟入Executors.callable方法
xxxxxxxxxxpublic static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 适配器模式将runnable转为callable return new RunnableAdapter<T>(task, result);}
// 继续跟入// 实现了Callable接口static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); // 注意到并没有返回真正的执行结果 return result; }}submut方法通过newTaskFor方法创建FutureTask对象,通过execute方法将FutureTask对象真正提交到线程池中(参考上文代码)
线程池的执行入口是FutureTask对象的run
xxxxxxxxxxpublic void run() { // 条件一:task已被执行 // 条件二:CAS失败(当前任务已被其他线程抢占) if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; // NEW状态且抢占成功 try { Callable<V> c = callable; // 防止空指针和外部线程cancel了当前任务 if (c != null && state == NEW) { // 执行结果的引用 V result; // true:callable.run代码执行成功无异常 // false:callable.run执行失败有异常 boolean ran; try { // 调用程序员实现的callable // 调用被装饰后的runnable result = c.call(); // call无异常 ran = true; } catch (Throwable ex) { // 执行异常 result = null; ran = false; // 设置异常 setException(ex); } // 设置执行结果 if (ran) set(result); } } finally { // 执行任务的线程清空 runner = null; int s = state; if (s >= INTERRUPTING) // 后续分析 handlePossibleCancellationInterrupt(s); }}set和setException方法类似
xxxxxxxxxxprotected void set(V v) { // 使用CAS设置任务完成中 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 执行结果赋值 outcome = v; // 设置任务状态为正常结束(直接设置内存) UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); }}
protected void setException(Throwable t) { // 使用CAS设置任务完成中 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 这里的t是抛出的异常 outcome = t; // 设置任务状态为异常(直接设置内存) UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}xxxxxxxxxx// 多个线程等待当前任务执行结果public V get() throws InterruptedException, ExecutionException { int s = state; // 状态还未结束 if (s <= COMPLETING) // 实现阻塞(核心) s = awaitDone(false, 0L); return report(s);}核心方法awaitDone
xxxxxxxxxxprivate int awaitDone(boolean timed, long nanos) throws InterruptedException { // 超时时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 当前引用线程封装为wait node WaitNode q = null; // 表示当前线程没有入队 boolean queued = false; for (;;) { // 当前线程是否被其他线程使用中断方式唤醒 // 只有第一次会返回true if (Thread.interrupted()) { // 当前线程从wait node出队(下文分析) removeWaiter(q); // 终端异常 throw new InterruptedException(); } int s = state; // 当前线程被unpark方式唤醒 // 如果最新状态是已经得到结果 if (s > COMPLETING) { // 判断是否为当前线程创建过wait node if (q != null) // GC q.thread = null; return s; } // 正在完成中 else if (s == COMPLETING) // 释放CPU进行下一次抢占 Thread.yield(); // 第一次循环:创建wait node对象 else if (q == null) q = new WaitNode(); // 第二次循环:已创建wait node对象但还未入队 else if (!queued) // waiters执行队列头 // CAS方式入队 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 第三次自旋: else if (timed) { // 处理有超时的情况 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 当前get操作的线程变为waiting状态 // 除非有其他线程唤醒或中断 LockSupport.park(this); }}跟入removeWaiter方法
xxxxxxxxxxprivate void removeWaiter(WaitNode node) { if (node != null) { // 引用线程GC node.thread = null; retry: for (;;) { for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; // 下一个节点是否为空 if (q.thread != null) // 前移 pred = q; // 当前节点是空,也就是不为头节点 else if (pred != null) { // 移除当前节点 pred.next = s; if (pred.thread == null) continue retry; } // CAS方式直接指向下一个节点 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } }}get最后的report方法
xxxxxxxxxxprivate V report(int s) throws ExecutionException { // 执行结果或异常 Object x = outcome; if (s == NORMAL) // 正常结果直接返回 return (V)x; if (s >= CANCELLED) // 用户取消则抛出取消异常 throw new CancellationException(); // 抛出执行异常 throw new ExecutionException((Throwable)x);}在分析到run的set方法中,有finishCompletion方法未分析
xxxxxxxxxxprivate void finishCompletion() { // 从头遍历队列 for (WaitNode q; (q = waiters) != null;) { // CAS置空防其他线程操作 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { // 当前wait node封装的线程 Thread t = q.thread; if (t != null) { // GC q.thread = null; // 唤醒当前线程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) // 当前处理完最后一个节点 break; // GC q.next = null; // 遍历队列 q = next; } break; } }
// done方法为空 done();
// GC callable = null;}用户如果主动调用,可以取消当前任务
xxxxxxxxxxpublic boolean cancel(boolean mayInterruptIfRunning) { // 任务正在运行中尝试用CAS方式中断或取消 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // 取反又返回false有点绕 return false; try { if (mayInterruptIfRunning) { try { Thread t = runner; // 当前线程不为空 if (t != null) // 中断当前线程 t.interrupt(); } finally { // 终端完成设置已中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 唤醒所有get阻塞的线程 finishCompletion(); } return true;}