& & &Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自/a/jichuzhishi/0.html)& & &本篇博文分析Executor中几个比较重要的接口和类。& & &Executor1 public interface Executor {2
void execute(Runnable command);3 }& & &Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法。它没有直接的实现类,有一个重要的子接口ExecutorService。& & &ExecutorService 1 //继承自Executor接口 2 public interface ExecutorService extends Executor { 3
* 关闭方法,调用后执行之前提交的任务,不再接受新的任务 5
void shutdown(); 7
* 从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表 9
List&Runnable& shutdownNow();11
* 判断执行器是否已经关闭13
boolean isShutdown();15
* 关闭后所有任务是否都已完成17
boolean isTerminated();19
boolean awaitTermination(long timeout, TimeUnit unit)23
throws InterruptedE24
* 提交一个Callable任务26
&T& Future&T& submit(Callable&T& task);28
* 提交一个Runable任务,result要返回的结果30
&T& Future&T& submit(Runnable task, T result);32
* 提交一个任务34
Future&?& submit(Runnable task);36
* 执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表38
&T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks)40
throws InterruptedE41
* 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。43
&T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks,45
long timeout, TimeUnit unit)46
throws InterruptedE47
* 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。49
&T& T invokeAny(Collection&? extends Callable&T&& tasks)51
throws InterruptedException, ExecutionE52
* 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。54
&T& T invokeAny(Collection&? extends Callable&T&& tasks,56
long timeout, TimeUnit unit)57
throws InterruptedException, ExecutionException, TimeoutE58 }& & ExecutorService接口继承自Executor接口,定义了终止、提交任务、跟踪任务返回结果等方法。& & ExecutorService涉及到Runnable、Callable、Future接口,这些接口的具体内容如下。 1 // 实现Runnable接口的类将被Thread执行,表示一个基本的任务 2 public interface Runnable { 3
// run方法就是它所有的内容,就是实际执行的任务 4
public abstract void run(); 5 } 6 // Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容 7 public interface Callable&V& { 8
// 相对于run方法的带有返回值的call方法 9
V call() throws E10 }Future 1 // Future代表异步任务的执行结果 2 public interface Future&V& { 3
* 尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。 6
boolean cancel(boolean mayInterruptIfRunning); 8
* 返回代表的任务是否在完成之前被取消了11
boolean isCancelled();13 14
* 如果任务已经完成,返回true16
boolean isDone();18 19
* 获取异步任务的执行结果(如果任务没执行完将等待)21
V get() throws InterruptedException, ExecutionE23 24
* 获取异步任务的执行结果(有最常等待时间的限制)26
V get(long timeout, TimeUnit unit)30
throws InterruptedException, ExecutionException, TimeoutE31 }& & &ExecutorService有一个子接口ScheduledExecutorService和一个抽象实现类AbstractExecutorService。& & &ScheduledExecutorService 1 // 可以安排指定时间或周期性的执行任务的ExecutorService 2 public interface ScheduledExecutorService extends ExecutorService { 3
* 在指定延迟后执行一个任务,只执行一次 5
public ScheduledFuture&?& schedule(Runnable command, 7
long delay, TimeUnit unit); 8
* 与上面的方法相同,只是接受的是Callable任务10
public &V& ScheduledFuture&V& schedule(Callable&V& callable,12
long delay, TimeUnit unit);13
* 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit15
* 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...16
public ScheduledFuture&?& scheduleAtFixedRate(Runnable command,18
long initialDelay,19
long period,20
TimeUnit unit);21
* 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit23
* 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...24
public ScheduledFuture&?& scheduleWithFixedDelay(Runnable command,26
long initialDelay,27
long delay,28
TimeUnit unit);29 }& &&ScheduledExecutorService定义了四个方法,已经在上面给出基本的解释。ScheduledExecutorService有两个实现类,分别是DelegatedScheduledExecutorService和ScheduledThreadPoolExecutor,将在后面介绍。还需要解释的是ScheduledFuture。& &&ScheduledFuture继承自Future和Delayed接口,自身没有添加方法。Delayed接口定义了一个获取剩余延迟的方法。& & &AbstractExecutorService
1 // 提供ExecutorService的默认实现
2 public abstract class AbstractExecutorService implements ExecutorService {
* 为指定的Runnable和value构造一个FutureTask,value表示默认被返回的Future
protected &T& RunnableFuture&T& newTaskFor(Runnable runnable, T value) {
return new FutureTask&T&(runnable, value);
* 为指定的Callable创建一个FutureTask 11
protected &T& RunnableFuture&T& newTaskFor(Callable&T& callable) { 13
return new FutureTask&T&(callable); 14
* 提交Runnable任务 17
public Future&?& submit(Runnable task) { 19
if (task == null) throw new NullPointerException(); 20
// 通过newTaskFor方法构造RunnableFuture,默认的返回值是null 21
RunnableFuture&Object& ftask = newTaskFor(task, null); 22
// 调用具体实现的execute方法 23
execute(ftask); 24
* 提交Runnable任务 28
public &T& Future&T& submit(Runnable task, T result) { 30
if (task == null) throw new NullPointerException(); 31
// 通过newTaskFor方法构造RunnableFuture,默认的返回值是result 32
RunnableFuture&T& ftask = newTaskFor(task, result); 33
execute(ftask); 34
* 提交Callable任务 38
public &T& Future&T& submit(Callable&T& task) { 40
if (task == null) throw new NullPointerException(); 41
RunnableFuture&T& ftask = newTaskFor(task); 42
execute(ftask); 43
* doInvokeAny的具体实现(核心内容),其它几个方法都是重载方法,都对这个方法进行调用 48
* tasks 是被执行的任务集,timed标志是否定时的,nanos表示定时的情况下执行任务的限制时间 49
private &T& T doInvokeAny(Collection&? extends Callable&T&& tasks, 51
boolean timed, long nanos) 52
throws InterruptedException, ExecutionException, TimeoutException { 53
// tasks空判断 54
if (tasks == null) 55
throw new NullPointerException(); 56
// 任务数量 57
int ntasks = tasks.size(); 58
if (ntasks == 0) 59
throw new IllegalArgumentException(); 60
// 创建对应数量的Future返回集 61
List&Future&T&& futures= new ArrayList&Future&T&&(ntasks); 62
ExecutorCompletionService&T& ecs = 63
new ExecutorCompletionService&T&(this); 64
// 执行异常 66
ExecutionException ee = 67
// System.nanoTime()根据系统计时器当回当前的纳秒值 68
long lastTime = (timed)? System.nanoTime() : 0; 69
// 获取任务集的遍历器 70
Iterator&? extends Callable&T&& it = tasks.iterator(); 71
// 向执行器ExecutorCompletionService提交一个任务,并将结果加入futures中 73
futures.add(ecs.submit( 74
// 修改任务计数器 75
// 活跃任务计数器 77
int active = 1; 78
for (;;) { 79
// 获取并移除代表已完成任务的Future,如果不存在,返回null 80
Future&T& f = ecs.poll(); 81
if (f == null) { 82
// 没有任务完成,且任务集中还有未提交的任务 83
if (ntasks & 0) { 84
// 剩余任务计数器减1 85
// 提交任务并添加结果 87
futures.add(ecs.submit(; 88
// 活跃任务计数器加1 89
// 没有剩余任务,且没有活跃任务(所有任务可能都会取消),跳过这一次循环 92
else if (active == 0) 93
else if (timed) { 95
// 获取并移除代表已完成任务的Future,如果不存在,会等待nanos指定的纳秒数 96
f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 97
if (f == null) 98
throw new TimeoutException(); 99
// 计算剩余可用时间100
long now = System.nanoTime();101
nanos -= now - lastT102
lastTime =103
// 获取并移除表示下一个已完成任务的未来,等待,如果目前不存在。106
// 执行到这一步说明已经没有任务任务可以提交,只能等待某一个任务的返回107
f = ecs.take();108
// f不为空说明有一个任务完成了110
if (f != null) {111
// 已完成一个任务,所以活跃任务计数减1112
// 返回该任务的结果115
return f.get();116
} catch (InterruptedException ie) {117
} catch (ExecutionException eex) {119
} catch (RuntimeException rex) {121
ee = new ExecutionException(rex);122
// 如果没有成功返回结果则抛出异常126
if (ee == null)127
ee = new ExecutionException();128 129 130
} finally {131
// 无论执行中发生异常还是顺利结束,都将取消剩余未执行的任务132
for (Future&T& f : futures)133
public &T& T invokeAny(Collection&? extends Callable&T&& tasks)138
throws InterruptedException, ExecutionException {139
// 非定时任务的doInvokeAny调用141
return doInvokeAny(tasks, false, 0);142
} catch (TimeoutException cannotHappen) {143
// 定时任务的invokeAny调用,timeout表示超时时间,unit表示时间单位148
public &T& T invokeAny(Collection&? extends Callable&T&& tasks,149
long timeout, TimeUnit unit)150
throws InterruptedException, ExecutionException, TimeoutException {151
return doInvokeAny(tasks, true, unit.toNanos(timeout));152
// 无超时设置的invokeAll方法154
public &T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks)155
throws InterruptedException {156
// 空任务判断157
if (tasks == null)158
throw new NullPointerException();159
// 创建大小为任务数量的结果集160
List&Future&T&& futures = new ArrayList&Future&T&&(tasks.size());161
// 是否完成所有任务的标记162
boolean done =163
// 遍历并执行任务165
for (Callable&T& t : tasks) {166
RunnableFuture&T& f = newTaskFor(t);167
// 遍历结果集171
for (Future&T& f : futures) {172
// 如果某个任务没完成,通过f调用get()方法173
if (!f.isDone()) {174
// get方法等待计算完成,然后获取结果(会等待)。所以调用get后任务就会完成计算,否则会等待176
} catch (CancellationException ignore) {178
} catch (ExecutionException ignore) {179
// 标志所有任务执行完成183
// 返回结果185
} finally {187
// 假如没有完成所有任务(可能是发生异常等情况),将任务取消188
if (!done)189
for (Future&T& f : futures)190
// 超时设置的invokeAll方法194
public &T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks,195
long timeout, TimeUnit unit)196
throws InterruptedException {197
// 需要执行的任务集为空或时间单位为空,抛出异常198
if (tasks == null || unit == null)199
throw new NullPointerException();200
// 将超时时间转为纳秒单位201
long nanos = unit.toNanos(timeout);202
// 创建任务结果集203
List&Future&T&& futures = new ArrayList&Future&T&&(tasks.size());204
// 是否全部完成的标志205
boolean done =206
// 遍历tasks,将任务转为RunnableFuture208
for (Callable&T& t : tasks)209
// 记录当前时间(单位是纳秒)211
long lastTime = System.nanoTime();212
// 获取迭代器213
Iterator&Future&T&& it = futures.iterator();214
// 遍历215
while (it.hasNext()) {216
// 执行任务217
// 记录当前时间219
long now = System.nanoTime();220
// 计算剩余可用时间221
nanos -= now - lastT222
// 更新上一次执行时间223
lastTime =224
// 超时,返回保存任务状态的结果集225
if (nanos &= 0)226
for (Future&T& f : futures) {230
// 如果有任务没完成231
if (!f.isDone()) {232
// 时间已经用完,返回保存任务状态的结果集233
if (nanos &= 0)234
// 获取计算结果,最多等待给定的时间nanos,单位是纳秒237
f.get(nanos, TimeUnit.NANOSECONDS);238
} catch (CancellationException ignore) {239
} catch (ExecutionException ignore) {240
} catch (TimeoutException toe) {241
// 计算可用时间244
long now = System.nanoTime();245
nanos -= now - lastT246
lastTime =247
// 修改是否全部完成的标记250
// 返回结果集252
} finally {254
// 假如没有完成所有任务(可能是时间已经用完、发生异常等情况),将任务取消255
if (!done)256
for (Future&T& f : futures)257
}260 }& & AbstractExecutor实现了ExecutorService接口的部分方法。具体代码的分析在上面已经给出。& &&AbstractExecutor有两个子类:DelegatedExecutorService、ThreadPoolExecutor。将在后面介绍。& & 下面是AbstractExecutor中涉及到的RunnableFuture、FutureTask、ExecutorCompletionService。& &&RunnableFuture继承自Future和Runnable,只有一个run()方法(Runnable中已经有一个run方法了,为什么RunnableFuture还要重新写一个run方法呢?求高手指教)。RunnableFuture接口看上去就像是Future和Runnable两个接口的组合。& & FutureTask实现了RunnableFuture接口,除了实现了Future和Runnable中的方法外,它还有自己的方法和一个内部类Sync。& &&ExecutorCompletionService实现了CompletionService接口,将结果从复杂的一部分物种解耦出来。这些内容后续会介绍,不过这里先介绍框架中的其它内容,弄清整体框架。& & 下面看继承自AbstractExecutorService的ThreadPoolExecutor。& &&ThreadPoolExecutorThreadPoolExecutor(好长)
1 public class ThreadPoolExecutor extends AbstractExecutorService {
// 检查关闭的权限
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/* runState提供了主要的生命周期控制,可取值有以下几个:
* RUNNING:接受新的任务,处理队列中的任务
* SHUTDOWN:不再接受新的任务,但是处理队列中的任务
* STOP:不接受新任务,也不处理队列中的任务,打断正在处理的任务
* TERMINATED:和STOP类似,同时终止所有线程 10
On invocation of shutdown(), perhaps implicitly in finalize() 12
On invocation of shutdownNow() 14
When both queue and pool are empty 16
When pool is empty 18
volatile int runS 21
static final int RUNNING
static final int SHUTDOWN
static final int STOP
static final int TERMINATED = 3; 25
// 用于保持任务的队列 27
private final BlockingQueue&Runnable& workQ 28
// poolSize, corePoolSize, maximumPoolSize, runState, workers set的更新锁 29
private final ReentrantLock mainLock = new ReentrantLock(); 30
// mainLock锁的一个Condition实例 31
private final Condition termination = mainLock.newCondition(); 32
// 保持线程池中所有的工作线程。只有获取mainLock锁后才能访问。 33
private final HashSet&Worker& workers = new HashSet&Worker&(); 34
// 空闲线程的等待时间,大为是纳秒 35
private volatile long
keepAliveT 36
// 是否允许核心线程&活着& false(默认值)允许,哪怕空闲;true则使用keepAliveTime来控制等待超时时间 37
private volatile boolean allowCoreThreadTimeO 38
// 核心线程池的大小 39
private volatile int
corePoolS 40
// pool size最大值 41
private volatile int
maximumPoolS 42
// 当前pool大小 43
private volatile int
// 拒绝执行的处理器 顾名思义,当一个任务被拒绝执行后将同个这个处理器进行处理 45
private volatile RejectedExecutionH 46
// 线程工厂,用于创建线程 47
private volatile ThreadFactory threadF 48
// 最终pool size达到的最大值 49
private int largestPoolS 50
// 已完成任务计数 51
private long completedTaskC 52
// 默认的拒绝执行的处理器 53
private static final RejectedExecutionHandler defaultHandler = 54
new AbortPolicy(); 55
* 关于借个size的说明: 57
* 线程池数量poolSize指工作线程Worker对象的集合workers的实际大小,通过workers.size()可直接获得。
* 核心线程池数量corePoolSize,可理解为工作线程Worker对象的集合workers的目标大小。 59
* 如果poolSize & corePoolSize,那么ThreadPoolExecutor就会有机制在适当的时候回收闲置的线程。 60
* 最大线程池数量maxPoolSize,就是工作线程Worker对象的集合workers的大小上限。 61
* 假如说任务队列满了,再来新任务时,若poolSize还没达到maxPoolSize,则继续创建新的线程来执行新任务, 62
* 若不幸poolSize达到了上限maxPoolSize,那不能再创建新的线程了,只能采取reject策略来拒绝新任务。 63
/** 构造方法 开始*/ 65
public ThreadPoolExecutor(int corePoolSize, 66
int maximumPoolSize, 67
long keepAliveTime, 68
TimeUnit unit, 69
BlockingQueue&Runnable& workQueue) { 70
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 71
Executors.defaultThreadFactory(), defaultHandler); 72
public ThreadPoolExecutor(int corePoolSize, 74
int maximumPoolSize, 75
long keepAliveTime, 76
TimeUnit unit, 77
BlockingQueue&Runnable& workQueue, 78
ThreadFactory threadFactory) { 79
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 80
threadFactory, defaultHandler); 81
public ThreadPoolExecutor(int corePoolSize, 83
int maximumPoolSize, 84
long keepAliveTime, 85
TimeUnit unit, 86
BlockingQueue&Runnable& workQueue, 87
RejectedExecutionHandler handler) { 88
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 89
Executors.defaultThreadFactory(), handler); 90
// 主要的构造方法,其它构造方法都是对这个方法的调用 92
public ThreadPoolExecutor(int corePoolSize, 93
int maximumPoolSize, 94
long keepAliveTime, 95
TimeUnit unit, 96
BlockingQueue&Runnable& workQueue, 97
ThreadFactory threadFactory, 98
RejectedExecutionHandler handler) { 99
// 非法输入(明显这些值都是不能小于0的)100
if (corePoolSize & 0 ||101
maximumPoolSize &= 0 ||102
maximumPoolSize & corePoolSize ||103
keepAliveTime & 0)104
throw new IllegalArgumentException();105
// 空判断106
if (workQueue == null || threadFactory == null || handler == null)107
throw new NullPointerException();108
this.corePoolSize = corePoolS109
this.maximumPoolSize = maximumPoolS110
this.workQueue = workQ111
this.keepAliveTime = unit.toNanos(keepAliveTime);112
this.threadFactory = threadF113
this.handler =114
/** 构造方法 结束*/116
// 执行Runnable任务119
public void execute(Runnable command) {120
if (command == null)121
throw new NullPointerException();122
就是new 一个新工作线程放到workers集合中,让这个新线程来执行当前的任务command,而这个新线程可以认为是核心线程池中的其中一个线程。*/128
if (poolSize &= corePoolSize || !addIfUnderCorePoolSize(command)) {129
// 线程池状态时RUNNING且能将任务添加到worker队列中130
if (runState == RUNNING && workQueue.offer(command)) {131
// 加入了队列以后,只要保证有工作线程就ok了,工作线程会自动去执行任务队列的。132
// 所以判断一下if ( runState != RUNNING || poolSize == 0),133
// 在这个if为true时候,去保证一下任务队列有线程会执行,即执行ensureQueuedTaskHandled(command)方法。134
// 这里有两种情况,情况一:runState != RUNNING,这种情况在ensureQueuedTaskHandled方法中会把任务丢给reject拒绝策略处理,135
// 情况二:poolSize == 0,这种情况是new一个新线程加入到工作线程集合workers中。136
if (runState != RUNNING || poolSize == 0)137
// 程序执行到这个分支,说明当前状态runState != RUNNING,或者任务队列workQueue已经满了。140
// 先看第一个条件下,前面解释过runState,除了RUNNING状态,其他三个状态都不能接收新任务,141
// 所以当runState != RUNNING时新任务只能根据reject策略拒绝,142
// 而这个拒绝的逻辑是在addIfUnderMaximumPoolSize方法中实现的;143
// 再看第二个条件下,workQueue已经满,潜在的条件是runState == RUNNING,这种情况怎么处理新任务呢?144
// 很简单,若当前线程数量poolSize没有达到最大线程数量maxPoolSize,145
// 则创建新的线程去执行这个无法加入任务队列的新任务,146
// 否则就根据reject策略拒绝147
else if (!addIfUnderMaximumPoolSize(command))148
reject(command); // is shutdown or saturated149
private Thread addThread(Runnable firstTask) {153
Worker w = new Worker(firstTask);154
// 创建一个新Thread t155
Thread t = threadFactory.newThread(w);156
if (t != null) {157
w.thread =158
int nt = ++poolS160
// 跟踪线程池大小的最大值161
if (nt & largestPoolSize)162
largestPoolSize =163
// 创建并启动新线程执行firstTask(在运行线程数小于核心线程池大小的情况且状态是RUNNING)168
private boolean addIfUnderCorePoolSize(Runnable firstTask) {169
Thread t =170
final ReentrantLock mainLock = this.mainL171
// 获取锁172
if (poolSize & corePoolSize && runState == RUNNING)175
// 创建一个新线程176
t = addThread(firstTask);177
} finally {178
// 释放锁179
if (t == null)182
// 启动线程执行任务184
// 创建并启动新线程执行firstTask(在运行线程数小于pool size的最大值的情况且状态是RUNNING)189
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {190
Thread t =191
final ReentrantLock mainLock = this.mainL192
if (poolSize & maximumPoolSize && runState == RUNNING)195
t = addThread(firstTask);196
} finally {197
if (t == null)200
// 确保任务被处理206
private void ensureQueuedTaskHandled(Runnable command) {207
final ReentrantLock mainLock = this.mainL208
// 拒绝标记210
boolean reject =211
Thread t =212
int state = runS214
// 如果状态不是RUNNING,能成功从worker队列中移除,则拒绝这个任务执行215
if (state != RUNNING && workQueue.remove(command))216
reject =217
else if (state & STOP &&218
poolSize & Math.max(corePoolSize, 1) &&219
t = addThread(null);221
} finally {222
if (reject)225
else if (t != null)227
// 不用拒绝任务则启动线程执行任务228
// 调用RejectedExecutionHandler决绝任务232
void reject(Runnable command) {233
handler.rejectedExecution(command, this);234
// 工作线程,实现了Runnable接口236
private final class Worker implements Runnable {237
// 每个任务执行都必须获取和释放runLock。这主要是防止中断的目的是取消工作线程,而不是中断正在运行的任务。238
private final ReentrantLock runLock = new ReentrantLock();239
// 要执行的任务240
private Runnable firstT241
// 每个线程完成任务的计数器,最后会统计到completedTaskCount中242
volatile long completedT243
// 用于执行Runnable的线程244
// 构造方法246
Worker(Runnable firstTask) {247
this.firstTask = firstT248
// 判断这个线程是否活动250
boolean isActive() {251
return runLock.isLocked();252
// 中断闲置线程254
void interruptIfIdle() {255
final ReentrantLock runLock = this.runL256
if (runLock.tryLock()) {257
if (thread != Thread.currentThread())259
} finally {261
// 中断266
void interruptNow() {267
private void runTask(Runnable task) {272
final ReentrantLock runLock = this.runL273
if (runState & STOP &&277
Thread.interrupted() &&278
runState &= STOP)279
boolean ran =282
beforeExecute(thread, task);283;285
afterExecute(task, null);287
} catch (RuntimeException ex) {289
if (!ran)290
afterExecute(task, ex);291
} finally {294
public void run() {300
Runnable task = firstT302
firstTask =303
* 注意这段while循环的执行逻辑,每执行完一个核心线程后,就会去线程池 305
* 队列中取下一个核心线程,如取出的核心线程为null,则当前工作线程终止 306
while (task != null || (task = getTask()) != null) {308
} finally {313
// 当前工作线程退出
// 从池队列中取的核心线程(任务)的方法320
Runnable getTask() {321
for (;;) {322
// 获取运行状态324
int state = runS325
if (state & SHUTDOWN)327
// SHUTDOWN状态330
if (state == SHUTDOWN)
// 帮助清空队列331
r = workQueue.poll();332
// 状态时RUNNING,且poolSize & corePoolSize或allowCoreThreadTimeOut333
else if (poolSize & corePoolSize || allowCoreThreadTimeOut)334
// 获取并移除元素(等待指定的时间)335
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);336
// 获取并移除元素(会一直等待知道获取到有效元素)338
r = workQueue.take();339
// 获取结果不为空,返回340
if (r != null)341
// 检查一个获取任务失败的线程能否退出343
if (workerCanExit()) {344
if (runState &= SHUTDOWN) // 中断其他线程345
// Else retry349
} catch (InterruptedException ie) {350
// On interruption, re-check runState351
// 检查一个获取任务失败的线程能否退出356
private boolean workerCanExit() {357
final ReentrantLock mainLock = this.mainL358
boolean canE360
// 可以退出的条件是状态为STOP或TERMINATED或至少有一个处理非空队列的线程(在允许超时的情况下)362
canExit = runState &= STOP ||363
workQueue.isEmpty() ||364
(allowCoreThreadTimeOut &&365
poolSize & Math.max(1, corePoolSize));366
} finally {367
return canE370
// 中断其他线程373
void interruptIdleWorkers() {374
final ReentrantLock mainLock = this.mainL375
// 遍历工作线程378
for (Worker w : workers)379
// 尝试中断闲置线程380
} finally {382
// 工作线程退出要处理的逻辑 386
void workerDone(Worker w) {387
final ReentrantLock mainLock = this.mainL388
completedTaskCount += w.completedT391
if (--poolSize == 0)//poolSize减一,这时其实又可以创建工作线程了
} finally {395
// 尝试终止400
private void tryTerminate() {401
if (poolSize == 0) {403
int state = runS404
* 如果当前已经没有了工作线程(Worker),但是线程队列里还有等待的线程任务,则创建一个 406
* 工作线程来执行线程队列中等待的任务 407
if (state & STOP && !workQueue.isEmpty()) {409
state = RUNNING; // disable termination check below410
Thread t = addThread(null);411
if (t != null)412
// 设置池状态为终止状态
if (state == STOP || state == SHUTDOWN) {416
runState = TERMINATED;417
// 发起一个有序的关闭在以前已提交任务的执行,但不接受新任务。如果已经关闭,调用不会有其他影响。423
public void shutdown() {424
// Gets the system security interface.425
SecurityManager security = System.getSecurityManager();426
if (security != null)427
// 检查权限(以抛出异常的形式)428
final ReentrantLock mainLock = this.mainL430
if (security != null) { // 检查调用者是否能修改线程433
for (Worker w : workers)434
// 获取运行状态437
int state = runS438
// 小于SHUTDOWN的不就是RUNNING么。。。439
if (state & SHUTDOWN)440
runState = SHUTDOWN;441 442
for (Worker w : workers) {444
// 中断线程445
} catch (SecurityException se) { // Try to back out448
runState =449
// tryTerminate() here would be a no-op450
// 尝试终止453
tryTerminate(); // Terminate now if pool and queue empty454
} finally {455
public List&Runnable& shutdownNow() {461
SecurityManager security = System.getSecurityManager();462
if (security != null)463
security.checkPermission(shutdownPerm);464 465
final ReentrantLock mainLock = this.mainL466
if (security != null) { // Check if caller can modify our threads469
for (Worker w : workers)470
int state = runS474
// 与上一个方法主要区别在于状态和interruptNow方法475
if (state & STOP)476
runState = STOP;477 478
for (Worker w : workers) {480
} catch (SecurityException se) { // Try to back out483
runState =484
// tryTerminate() here would be a no-op485
List&Runnable& tasks = drainQueue();489
tryTerminate(); // Terminate now if pool and queue empty490
} finally {492
// 清空队列497
private List&Runnable& drainQueue() {498
List&Runnable& taskList = new ArrayList&Runnable&();499
// 将队列中的所有元素一到taskList中500
while (!workQueue.isEmpty()) {502
Iterator&Runnable& it = workQueue.iterator();503
if (it.hasNext()) {505
Runnable r =;506
// 从workQueue中移除,并添加到taskList中507
if (workQueue.remove(r))508
} catch (ConcurrentModificationException ignore) {511
return taskL514
public boolean isShutdown() {517
return runState != RUNNING;518
boolean isStopped() {522
return runState == STOP;523
public boolean isTerminating() {527
int state = runS528
return state == SHUTDOWN || state == STOP;529
public boolean isTerminated() {532
return runState == TERMINATED;533
public boolean awaitTermination(long timeout, TimeUnit unit)536
throws InterruptedException {537
long nanos = unit.toNanos(timeout);538
final ReentrantLock mainLock = this.mainL539
for (;;) {542
if (runState == TERMINATED)543
if (nanos &= 0)545
nanos = termination.awaitNanos(nanos);547
} finally {549
protected void finalize()
public void setThreadFactory(ThreadFactory threadFactory) {560
if (threadFactory == null)561
throw new NullPointerException();562
this.threadFactory = threadF563
public ThreadFactory getThreadFactory() {567
return threadF568
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {572
if (handler == null)573
throw new NullPointerException();574
this.handler =575
public RejectedExecutionHandler getRejectedExecutionHandler() {579
// 设置核心线程数 这里的设置将覆盖构造方法中的设置583
// 如果小于构造方法的设置,多余的线程将被闲置584
// 如果大于构造方法的设置,新线程将被用于执行排队的任务585
public void setCorePoolSize(int corePoolSize) {586
if (corePoolSize & 0)587
throw new IllegalArgumentException();588
final ReentrantLock mainLock = this.mainL589
int extra = this.corePoolSize - corePoolS592
this.corePoolSize = corePoolS593
// 大于构造方法的设置594
if (extra & 0) {595
int n = workQueue.size(); 596
while (extra++ & 0 && n-- & 0 && poolSize & corePoolSize) {597
Thread t = addThread(null);598
if (t != null)599
// 小于构造方法的设置605
else if (extra & 0 && poolSize & corePoolSize) {606
Iterator&Worker& it = workers.iterator();608
while (it.hasNext() &&609
extra-- & 0 &&610
poolSize & corePoolSize &&611
workQueue.remainingCapacity() == 0)612;613
} catch (SecurityException ignore) {614
// N it is OK if the threads stay live615
} finally {618
public int getCorePoolSize() {624
return corePoolS625
public boolean prestartCoreThread() {629
return addIfUnderCorePoolSize(null);630
public int prestartAllCoreThreads() {634
int n = 0;635
while (addIfUnderCorePoolSize(null))636
public boolean allowsCoreThreadTimeOut() {642
return allowCoreThreadTimeO643
public void allowCoreThreadTimeOut(boolean value) {647
if (value && keepAliveTime &= 0)648
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");649 650
allowCoreThreadTimeOut =651
// 设置所允许的最大的线程数。这将覆盖在构造函数中设置的任何值。如果新值小于当前值,多余的现有线程将被终止时,他们成为闲置。654
public void setMaximumPoolSize(int maximumPoolSize) {655
if (maximumPoolSize &= 0 || maximumPoolSize & corePoolSize)656
throw new IllegalArgumentException();657
final ReentrantLock mainLock = this.mainL658
int extra = this.maximumPoolSize - maximumPoolS661
this.maximumPoolSize = maximumPoolS662
if (extra & 0 && poolSize & maximumPoolSize) {663
Iterator&Worker& it = workers.iterator();665
while (it.hasNext() &&666
extra & 0 &&667
poolSize & maximumPoolSize) {668;669
} catch (SecurityException ignore) {672
// N it is OK if the threads stay live673
} finally {676
public int getMaximumPoolSize() {682
return maximumPoolS683
public void setKeepAliveTime(long time, TimeUnit unit) {687
if (time & 0)688
throw new IllegalArgumentException();689
if (time == 0 && allowsCoreThreadTimeOut())690
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");691
this.keepAliveTime = unit.toNanos(time);692
public long getKeepAliveTime(TimeUnit unit) {696
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);697
public BlockingQueue&Runnable& getQueue() {701
return workQ702
public boolean remove(Runnable task) {706
return getQueue().remove(task);707
// 移除所有被取消的任务710
public void purge() {711
// Fail if we encounter interference during traversal712
Iterator&Runnable& it = getQueue().iterator();714
while (it.hasNext()) {715
Runnable r =;716
if (r instanceof Future&?&) {717
Future&?& c = (Future&?&)r;718
if (c.isCancelled())719
catch (ConcurrentModificationException ex) {724725
public int getPoolSize() {730
return poolS731
// 获取活跃线程数734
public int getActiveCount() {735
final ReentrantLock mainLock = this.mainL736
int n = 0;739
for (Worker w : workers) {740
if (w.isActive())741
} finally {745
public int getLargestPoolSize() {751
final ReentrantLock mainLock = this.mainL752
return largestPoolS755
} finally {756
public long getTaskCount() {762
final ReentrantLock mainLock = this.mainL763
long n = completedTaskC766
for (Worker w : workers) {767
// 统计已经完成的任务768
n += w.completedT769
// 如果w是活跃线程,说明正在执行一个任务,所以n加一770
if (w.isActive())771
// 加上队列中的任务774
return n + workQueue.size();775
} finally {776
// 获取已完成的任务数781
public long getCompletedTaskCount() {782
final ReentrantLock mainLock = this.mainL783
long n = completedTaskC786
for (Worker w : workers)787
n += w.completedT788 789
} finally {790
protected void beforeExecute(Thread t, Runnable r) { }796 797
protected void afterExecute(Runnable r, Throwable t) { }799 800
protected void terminated() { }802 803
// 实现了RejectedExecutionHandler,即是一个拒绝执行的Handler804
public static class CallerRunsPolicy implements RejectedExecutionHandler {805
public CallerRunsPolicy() { }807 808
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {810
if (!e.isShutdown()) {811;812
public static class AbortPolicy implements RejectedExecutionHandler {818
public AbortPolicy() { }820 821
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {823
throw new RejectedExecutionException();824
public static class DiscardPolicy implements RejectedExecutionHandler {829
public DiscardPolicy() { }831 832
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {834
public static class DiscardOldestPolicy implements RejectedExecutionHandler {839
public DiscardOldestPolicy() { }841 842
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {844
if (!e.isShutdown()) {845
}850 }& & 可以参考http://xtu-/blog/647744& & 从上面的框架结构图中可以可以看出剩下的就是ScheduledThreadPoolExecutor和Executors。Executors是一个工具类,提供一些工厂和实用方法。& & 下面看ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor并实现了ScheduledExecutorService接口。& &&ScheduledThreadPoolExecutorScheduledThreadPoolExecutor// 可以安排指定时间或周期性的执行任务的ExecutorServicepublic class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
// 在Shutdown的时候如果要取消或关闭任务,设置为false;true表示继续执行任务,在Shutdown之后
private volatile boolean continueExistingPeriodicTasksAfterS
// false表示在Shutdown的时候取消Delayed的任务;true表示执行这个任务
private volatile boolean executeExistingDelayedTasksAfterShutdown =
// 打破调度联系,进而保证先进先出的顺序捆绑项目中的序列号
private static final AtomicLong sequencer = new AtomicLong(0);
// 基准时间
private static final long NANO_ORIGIN = System.nanoTime();
// 当前时间(相对于基准时间的值)
final long now() {
return System.nanoTime() - NANO_ORIGIN;
// RunnableScheduledFuture接口表示是否是周期性的
private class ScheduledFutureTask&V&
extends FutureTask&V& implements RunnableScheduledFuture&V& {
private final long sequenceN
// 预定安排执行的时刻
// 表示重复任务,0表示不重复,正数表示固定比率,负数表示固定延时
// 构造方法,构造一个只执行一次的任务
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time =
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
// 构造方法,构造一个按指定ns开始执行,指定period周期性执行
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time =
this.period =
this.sequenceNumber = sequencer.getAndIncrement();
// 构造方法,构造一个延时执行的任务
ScheduledFutureTask(Callable&V& callable, long ns) {
this.time =
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
// 按指定单位获取延时时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
// 判断传入延时和这个任务延时之间的大小关系
public int compareTo(Delayed other) {
// 为什么可以和Delayed比较?因为这个类实现了RunnableScheduledFuture接口,而RunnableScheduledFuture接口继承自Delayed接口
if (other == this) // compare zero ONLY if same object
// other是ScheduledFutureTask实例
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask&?& x = (ScheduledFutureTask&?&)
long diff = time - x.
// 比较大小
if (diff & 0)
return -1;
else if (diff & 0)
else if (sequenceNumber & x.sequenceNumber)
return -1;
long d = (getDelay(TimeUnit.NANOSECONDS) -
return (d == 0)? 0 : ((d & 0)? -1 : 1);
// 是否周期性的(包括延时的情况)
public boolean isPeriodic() {
return period != 0;
// 执行周期性的任务
private void runPeriodic() {
// 执行任务
boolean ok = ScheduledFutureTask.super.runAndReset();
// 判断是否已经shutdown
boolean down = isShutdown();
// 重新安排任务(如果没有shutdown或在没有关闭且允许在shutdown之后执行已存在的任务)
if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isStopped()))) {
if (p & 0)
// 计算下一次执行的时间
// 计算触发时间
time = triggerTime(-p);
// 将任务添加到队列中
else if (down)
// 执行任务,根据是否周期性调用不同的方法
public void run() {
if (isPeriodic())
// 延迟执行
private void delayedExecute(Runnable command) {
// 如果已经shutdown,决绝任务
if (isShutdown()) {
if (getPoolSize() & getCorePoolSize())
// 预启动线程
// 取消和清除关闭政策不允许运行的任务
private void cancelUnwantedTasks() {
// 获取shutdown策略
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic)
else if (keepDelayed || keepPeriodic) {
Object[] entries = super.getQueue().toArray();
for (int i = 0; i & entries. ++i) {
Object e = entries[i];
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture&?& t = (RunnableScheduledFuture&?&)e;
// 根据是否周期性的任务通过制定的值判断进行取消操作
if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
// 净化,移除已经取消的任务
public boolean remove(Runnable task) {
if (!(task instanceof RunnableScheduledFuture))
return getQueue().remove(task);
protected &V& RunnableScheduledFuture&V& decorateTask(
Runnable runnable, RunnableScheduledFuture&V& task) {
protected &V& RunnableScheduledFuture&V& decorateTask(
Callable&V& callable, RunnableScheduledFuture&V& task) {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), handler);
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay & 0) ? 0 : delay));
long triggerTime(long delay) {
return now() +
((delay & (Long.MAX_VALUE && 1)) ? delay : overflowFree(delay));
// 避免移除,返回延迟的值
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
if (headDelay & 0 && (delay - headDelay & 0))
delay = Long.MAX_VALUE + headD
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
// 根据执行的延时时间执行任务
public ScheduledFuture&?& schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// ScheduledFutureTask的result为null
RunnableScheduledFuture&?& t = decorateTask(command,
new ScheduledFutureTask&Void&(command, null,
triggerTime(delay, unit)));
// 延时执行
// 上一个方法的重载形式,接收的是Callable
public &V& ScheduledFuture&V& schedule(Callable&V& callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture&V& t = decorateTask(callable,
new ScheduledFutureTask&V&(callable,
triggerTime(delay, unit)));
* 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit
* 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...
public ScheduledFuture&?& scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period &= 0)
throw new IllegalArgumentException();
RunnableScheduledFuture&?& t = decorateTask(command,
new ScheduledFutureTask&Object&(command,
triggerTime(initialDelay, unit),
* 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit
* 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...
public ScheduledFuture&?& scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay &= 0)
throw new IllegalArgumentException();
RunnableScheduledFuture&?& t = decorateTask(command,
new ScheduledFutureTask&Boolean&(command,
triggerTime(initialDelay, unit),
// 执行任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 立即执行,延时时间是0
schedule(command, 0, TimeUnit.NANOSECONDS);
// 重新 AbstractExecutorService 的方法
public Future&?& submit(Runnable task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
public &T& Future&T& submit(Runnable task, T result) {
return schedule(Executors.callable(task, result),
public &T& Future&T& submit(Callable&T& task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
continueExistingPeriodicTasksAfterShutdown =
if (!value && isShutdown())
public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
return continueExistingPeriodicTasksAfterS
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
executeExistingDelayedTasksAfterShutdown =
if (!value && isShutdown())
public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
return executeExistingDelayedTasksAfterS
public void shutdown() {
// 取消任务
// 立即关闭,调用的是父类立即关闭的方法
public List&Runnable& shutdownNow() {
return super.shutdownNow();
// 返回使用这个执行器的任务队列
public BlockingQueue&Runnable& getQueue() {
return super.getQueue();
// 将DelayQueue&RunnableScheduledFuture& 包装为 BlockingQueue&Runnable&的类
// 类似于代理
private static class DelayedWorkQueue
extends AbstractCollection&Runnable&
implements BlockingQueue&Runnable& {
private final DelayQueue&RunnableScheduledFuture& dq = new DelayQueue&RunnableScheduledFuture&();
public Runnable poll() { return dq.poll(); }
public Runnable peek() { return dq.peek(); }
public Runnable take() throws InterruptedException { return dq.take(); }
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
return dq.poll(timeout, unit);
public boolean add(Runnable x) {
return dq.add((RunnableScheduledFuture)x);
public boolean offer(Runnable x) {
return dq.offer((RunnableScheduledFuture)x);
public void put(Runnable x) {
public boolean offer(Runnable x, long timeout, TimeUnit unit) {
return dq.offer((RunnableScheduledFuture)x, timeout, unit);
public Runnable remove() { return dq.remove(); }
public Runnable element() { return dq.element(); }
public void clear() { dq.clear(); }
public int drainTo(Collection&? super Runnable& c) { return dq.drainTo(c); }
public int drainTo(Collection&? super Runnable& c, int maxElements) {
return dq.drainTo(c, maxElements);
public int remainingCapacity() { return dq.remainingCapacity(); }
public boolean remove(Object x) { return dq.remove(x); }
public boolean contains(Object x) { return dq.contains(x); }
public int size() { return dq.size(); }
public boolean isEmpty() { return dq.isEmpty(); }
public Object[] toArray() { return dq.toArray(); }
public &T& T[] toArray(T[] array) { return dq.toArray(array); }
public Iterator&Runnable& iterator() {
return new Iterator&Runnable&() {
private Iterator&RunnableScheduledFuture& it = dq.iterator();
public boolean hasNext() { return it.hasNext(); }
public Runnable next() { return; }
public void remove() { it.remove(); }
}}& & 在代码中都加了注释,我想大致能解释清楚吧。& & Executor涉及的类还是比较多的,到此为止剩下的还有Executors& &&Executors& &&Executors中所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。此类支持以下各种方法:创建并返回设置有常用配置字符串的 ExecutorService 的方法。创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。创建并返回&包装的&ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。&& & &Executors提供的都是工具形式的方法,所以都是static的,并且这个类也没有必要实例化,所以它的构造方法时private的。下面主要看一下几个内部类。  &RunnableAdapter 1 static final class RunnableAdapter&T& implements Callable&T& { 2
task, T result) { 5
this.task = 6
this.result = 7
public T call() { 9;10
}12 }& & 适配器。以Callable的形式执行Runnable并且返回给定的result。& & PrivilegedCallable 1 static final class PrivilegedCallable&T& implements Callable&T& { 2
private final AccessControlC 3
private final Callable&T& 4
private T 5
private E 6
PrivilegedCallable(Callable&T& task) { 7
this.task = 8
this.acc = AccessController.getContext(); 9
public T call() throws Exception {12
AccessController.doPrivileged(new PrivilegedAction&T&() {13
public T run() {14
result =;16
} catch (Exception ex) {17
exception =18
}, acc);22
if (exception != null)23
}27 }& & 在访问控制下运行的Callable。涉及到Java.security包中的内容。& &&PrivilegedCallableUsingCurrentClassLoader类与上面的PrivilegedCallable类似,只是使用的是CurrentClassLoader。& &&DefaultThreadFactory 1
static class DefaultThreadFactory implements ThreadFactory { 2
static final AtomicInteger poolNumber = new AtomicInteger(1); 3
final ThreadG 4
final AtomicInteger threadNumber = new AtomicInteger(1); 5
final String nameP 6
DefaultThreadFactory() { 8
SecurityManager s = System.getSecurityManager(); 9
group = (s != null)? s.getThreadGroup() :10
namePrefix = "pool-" +12
poolNumber.getAndIncrement() +13
public Thread newThread(Runnable r) {17
// 调用Thread构造方法创建线程18
Thread t = new Thread(group, r,19
namePrefix + threadNumber.getAndIncrement(),20
// 取消守护线程设置22
if (t.isDaemon())23
// 设置默认优先级25
if (t.getPriority() != Thread.NORM_PRIORITY)26
t.setPriority(Thread.NORM_PRIORITY);27 28
}& & DefaultThreadFactory 是默认的线程工程,提供创建线程的方法。& &&PrivilegedThreadFactory继承自DefaultThreadFactory,区别在于线程执行的run方法指定了classLoader并受到权限的控制。& &&DelegatedExecutorService继承自AbstractExecutorService,是一个包装类,暴露ExecutorService的方法。& &&DelegatedScheduledExecutorService继承自DelegatedExecutorService,实现了ScheduledExecutorService接口。它也是一个包装类,公开ScheduledExecutorService方法。&&
