主要的实现FutureTask
# FutureTask实际上运行还是一个runnable,它对callable做了一个封装,让开发人员可以从其中获取返回值;FutrueTask是有状态的 共7种状态,四种状态变换的可能NEW -> COMPLETING -> EXCEPTIONALNEW -> CANCELLEDNEW -> COMPLETING -> NORMALNEW -> INTERRUPTING -> INTERRUPTED
Callable和runnable的区别
0. 通过call方法调用;1. 有返回值2. 可以抛异常
get结果的实现原理
1. 判断状态;2. 非NEW,COMPLETING状态则直接 进入report返回结果;3. 处于NEW,COMPLETING状态,则进入等待awaitDone();
3.x awaitDone 流程
3.1. 获取等待的超时时间deadline;3.2. 进入自旋3.3. 判断线程是否被中断:如果被中断则移出等待waiters队列;并抛出异常;3.4. 判断FutrueTask状态:如果">COMPLETING",代表执行完成,进入report;3.5. 判断FutrueTask状态:如果"=COMPLETING",让出CPU执行Thread.yield();3.6. 为当前线程创建一个node节点;3.7. 将当前线程WaitNode加入等待队列waiters中;3.8. 判断是否超时;3.9. 通过LockSupport.park挂起线程,等待运行许可;4. report返回执行结果:如果一切正常就返回执行结果,否则返回Exception;
run具体执行原理如下:
1. 判断状态是否正常,避免重复执行;2. 调用callable的call()方法;3. 修改执行状态;保存执行结果;并通知正在等待get的线程;## 3.x通知机制finishCompletion3.1. 获取所有waiters的集合;3.2. 通过cas 拿到执行权;3.3. 循环遍历所有等待的线程,通过LockSupport.unpark 唤醒其执行;
Callable和Future的实现原理(JDK8源码分析)
1. cancel 取消执行
public boolean cancel(boolean mayInterruptIfRunning) { // 判断状态:只有刚创建的情况下才能取消 // mayInterruptIfRunning:是否中断当前正在运行这个FutureTask的线程; if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception // 如果要中断当前线程,则对runner发布interrupt信号; if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state // 修改状态为:已经通知线程进行中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 通知其他在等待结果的线程 finishCompletion(); } return true;}
2. run
public void run() { // 判断状态及设置futuretask归属的线程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 执行Callable result = c.call(); // 标记为执行成功 ran = true; } catch (Throwable ex) { result = null; // 标记为执行不成功 ran = false; // 设置为异常状态,并通知其他在等待结果的线程 setException(ex); } // 如果执行成功,修改状态为正常,并通知其他在等待结果的线程 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 如果状态为准备发起中断信号或者已经发出中断信号,则让出CPU(Thread.yield()) if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
3. get
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果还没执行完,则等待 if (s <= COMPLETING) s = awaitDone(false, 0L); // 通过report取结果 return report(s);}
3.1 report 取执行结果
private V report(int s) throws ExecutionException { Object x = outcome; // 如果一切正常,则返回x(x是callable执行的结果outcome) if (s == NORMAL) return (V)x; // 如果被取消,则抛出已取消异常 if (s >= CANCELLED) throw new CancellationException(); // 否则抛出执行异常 throw new ExecutionException((Throwable)x);}
3.2 awaitDone 等待FutureTask执行结束
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 记录等待超时的时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 多个在等待结果的线程,通过一个链表进行保存,waitNode就是每个线程在链表中的节点; WaitNode q = null; boolean queued = false; // 死循环...也可以说是自旋锁同步 for (;;) { // 判断当前这个调用get的线程是否被中断 if (Thread.interrupted()) { // 将当前线程移出队列 removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果状态非初创或执行完毕了,则跳出循环,通过report()取执行结果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 如果状态等于已执行,让出CPU执行,等待状态变为正常结束 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 如果当前线程还没有创建对象的waitNode节点,则创建一个 else if (q == null) q = new WaitNode(); // 如果当前线程对应的waitNode还没有加入到等待链表中,则加入进去; else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果有设置等待超时时间,则通过parkNanos挂起当前线程,等待继续执行的信号 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } // 通过park挂起当前线程,等待task执行结束后给它发一个继续执行的信号(unpark) else LockSupport.park(this); }}
4. finishCompletion 通知所有在等待结果的线程
private void finishCompletion() { // assert state > COMPLETING; // 遍历所有正在等待执行结果的线程 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // unpark,发布一个让它继续执行的“许可” LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint}