nanosecondsjava ldap需要的jar包倒入什么包 java

[Java教程]Java Executor 框架
你的位置:
[Java教程]Java Executor 框架
& & &Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自/a/jichuzhishi/0.html)& & &本篇博文分析Executor中几个比较重要的接口和类。& & &Executor1 public interface Executor {2
void execute(Runnable command);3 }& & &Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法。它没有直接的实现类,有一个重要的子接口ExecutorService。& & &ExecutorService 1 //继承自Executor接口 2 public interface ExecutorService extends Executor { 3
* 关闭方法,调用后执行之前提交的任务,不再接受新的任务 5
void shutdown(); 7
* 从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表 9
List&Runnable& shutdownNow();11
* 判断执行器是否已经关闭13
boolean isShutdown();15
* 关闭后所有任务是否都已完成17
boolean isTerminated();19
boolean awaitTermination(long timeout, TimeUnit unit)23
throws InterruptedE24
* 提交一个Callable任务26
&T& Future&T& submit(Callable&T& task);28
* 提交一个Runable任务,result要返回的结果30
&T& Future&T& submit(Runnable task, T result);32
* 提交一个任务34
Future&?& submit(Runnable task);36
* 执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表38
&T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks)40
throws InterruptedE41
* 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。43
&T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks,45
long timeout, TimeUnit unit)46
throws InterruptedE47
* 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。49
&T& T invokeAny(Collection&? extends Callable&T&& tasks)51
throws InterruptedException, ExecutionE52
* 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。54
&T& T invokeAny(Collection&? extends Callable&T&& tasks,56
long timeout, TimeUnit unit)57
throws InterruptedException, ExecutionException, TimeoutE58 }& & ExecutorService接口继承自Executor接口,定义了终止、提交任务、跟踪任务返回结果等方法。& & ExecutorService涉及到Runnable、Callable、Future接口,这些接口的具体内容如下。 1 // 实现Runnable接口的类将被Thread执行,表示一个基本的任务 2 public interface Runnable { 3
// run方法就是它所有的内容,就是实际执行的任务 4
public abstract void run(); 5 } 6 // Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容 7 public interface Callable&V& { 8
// 相对于run方法的带有返回值的call方法 9
V call() throws E10 }Future 1 // Future代表异步任务的执行结果 2 public interface Future&V& { 3
* 尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。 6
boolean cancel(boolean mayInterruptIfRunning); 8
* 返回代表的任务是否在完成之前被取消了11
boolean isCancelled();13 14
* 如果任务已经完成,返回true16
boolean isDone();18 19
* 获取异步任务的执行结果(如果任务没执行完将等待)21
V get() throws InterruptedException, ExecutionE23 24
* 获取异步任务的执行结果(有最常等待时间的限制)26
timeout表示等待的时间,unit是它时间单位28
V get(long timeout, TimeUnit unit)30
throws InterruptedException, ExecutionException, TimeoutE31 }& & &ExecutorService有一个子接口ScheduledExecutorService和一个抽象实现类AbstractExecutorService。& & &ScheduledExecutorService 1 // 可以安排指定时间或周期性的执行任务的ExecutorService 2 public interface ScheduledExecutorService extends ExecutorService { 3
* 在指定延迟后执行一个任务,只执行一次 5
public ScheduledFuture&?& schedule(Runnable command, 7
long delay, TimeUnit unit); 8
* 与上面的方法相同,只是接受的是Callable任务10
public &V& ScheduledFuture&V& schedule(Callable&V& callable,12
long delay, TimeUnit unit);13
* 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit15
* 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...16
public ScheduledFuture&?& scheduleAtFixedRate(Runnable command,18
long initialDelay,19
long period,20
TimeUnit unit);21
* 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit23
* 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...24
public ScheduledFuture&?& scheduleWithFixedDelay(Runnable command,26
long initialDelay,27
long delay,28
TimeUnit unit);29 }& &&ScheduledExecutorService定义了四个方法,已经在上面给出基本的解释。ScheduledExecutorService有两个实现类,分别是DelegatedScheduledExecutorService和ScheduledThreadPoolExecutor,将在后面介绍。还需要解释的是ScheduledFuture。& &&ScheduledFuture继承自Future和Delayed接口,自身没有添加方法。Delayed接口定义了一个获取剩余延迟的方法。& & &AbstractExecutorService
1 // 提供ExecutorService的默认实现
2 public abstract class AbstractExecutorService implements ExecutorService {
* 为指定的Runnable和value构造一个FutureTask,value表示默认被返回的Future
protected &T& RunnableFuture&T& newTaskFor(Runnable runnable, T value) {
return new FutureTask&T&(runnable, value);
* 为指定的Callable创建一个FutureTask 11
protected &T& RunnableFuture&T& newTaskFor(Callable&T& callable) { 13
return new FutureTask&T&(callable); 14
* 提交Runnable任务 17
public Future&?& submit(Runnable task) { 19
if (task == null) throw new NullPointerException(); 20
// 通过newTaskFor方法构造RunnableFuture,默认的返回值是null 21
RunnableFuture&Object& ftask = newTaskFor(task, null); 22
// 调用具体实现的execute方法 23
execute(ftask); 24
* 提交Runnable任务 28
public &T& Future&T& submit(Runnable task, T result) { 30
if (task == null) throw new NullPointerException(); 31
// 通过newTaskFor方法构造RunnableFuture,默认的返回值是result 32
RunnableFuture&T& ftask = newTaskFor(task, result); 33
execute(ftask); 34
* 提交Callable任务 38
public &T& Future&T& submit(Callable&T& task) { 40
if (task == null) throw new NullPointerException(); 41
RunnableFuture&T& ftask = newTaskFor(task); 42
execute(ftask); 43
* doInvokeAny的具体实现(核心内容),其它几个方法都是重载方法,都对这个方法进行调用 48
* tasks 是被执行的任务集,timed标志是否定时的,nanos表示定时的情况下执行任务的限制时间 49
private &T& T doInvokeAny(Collection&? extends Callable&T&& tasks, 51
boolean timed, long nanos) 52
throws InterruptedException, ExecutionException, TimeoutException { 53
// tasks空判断 54
if (tasks == null) 55
throw new NullPointerException(); 56
// 任务数量 57
int ntasks = tasks.size(); 58
if (ntasks == 0) 59
throw new IllegalArgumentException(); 60
// 创建对应数量的Future返回集 61
List&Future&T&& futures= new ArrayList&Future&T&&(ntasks); 62
ExecutorCompletionService&T& ecs = 63
new ExecutorCompletionService&T&(this); 64
// 执行异常 66
ExecutionException ee = 67
// System.nanoTime()根据系统计时器当回当前的纳秒值 68
long lastTime = (timed)? System.nanoTime() : 0; 69
// 获取任务集的遍历器 70
Iterator&? extends Callable&T&& it = tasks.iterator(); 71
// 向执行器ExecutorCompletionService提交一个任务,并将结果加入futures中 73
futures.add(ecs.submit(it.next 74
// 修改任务计数器 75
// 活跃任务计数器 77
int active = 1; 78
for (;;) { 79
// 获取并移除代表已完成任务的Future,如果不存在,返回null 80
Future&T& f = ecs.poll(); 81
if (f == null) { 82
// 没有任务完成,且任务集中还有未提交的任务 83
if (ntasks & 0) { 84
// 剩余任务计数器减1 85
// 提交任务并添加结果 87
futures.add(ecs.submit(it.next())); 88
// 活跃任务计数器加1 89
// 没有剩余任务,且没有活跃任务(所有任务可能都会取消),跳过这一次循环 92
else if (active == 0) 93
else if (timed) { 95
// 获取并移除代表已完成任务的Future,如果不存在,会等待nanos指定的纳秒数 96
f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 97
if (f == null) 98
throw new TimeoutException(); 99
// 计算剩余可用时间100
long now = System.nanoTime();101
nanos -= now - lastT102
lastTime =103
// 获取并移除表示下一个已完成任务的未来,等待,如果目前不存在。106
// 执行到这一步说明已经没有任务任务可以提交,只能等待某一个任务的返回107
f = ecs.take();108
// f不为空说明有一个任务完成了110
if (f != null) {111
// 已完成一个任务,所以活跃任务计数减1112
// 返回该任务的结果115
return f.get();116
} catch (InterruptedException ie) {117
} catch (ExecutionException eex) {119
} catch (RuntimeException rex) {121
ee = new ExecutionException(rex);122
// 如果没有成功返回结果则抛出异常126
if (ee == null)127
ee = new ExecutionException();128 129 130
} finally {131
// 无论执行中发生异常还是顺利结束,都将取消剩余未执行的任务132
for (Future&T& f : futures)133
f.cancel(true);134
public &T& T invokeAny(Collection&? extends Callable&T&& tasks)138
throws InterruptedException, ExecutionException {139
// 非定时任务的doInvokeAny调用141
return doInvokeAny(tasks, false, 0);142
} catch (TimeoutException cannotHappen) {143
// 定时任务的invokeAny调用,timeout表示超时时间,unit表示时间单位148
public &T& T invokeAny(Collection&? extends Callable&T&& tasks,149
long timeout, TimeUnit unit)150
throws InterruptedException, ExecutionException, TimeoutException {151
return doInvokeAny(tasks, true, unit.toNanos(timeout));152
// 无超时设置的invokeAll方法154
public &T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks)155
throws InterruptedException {156
// 空任务判断157
if (tasks == null)158
throw new NullPointerException();159
// 创建大小为任务数量的结果集160
List&Future&T&& futures = new ArrayList&Future&T&&(tasks.size());161
// 是否完成所有任务的标记162
boolean done =163
// 遍历并执行任务165
for (Callable&T& t : tasks) {166
RunnableFuture&T& f = newTaskFor(t);167
futures.add(f);168
execute(f);169
// 遍历结果集171
for (Future&T& f : futures) {172
// 如果某个任务没完成,通过f调用get()方法173
if (!f.isDone()) {174
// get方法等待计算完成,然后获取结果(会等待)。所以调用get后任务就会完成计算,否则会等待176
f.get();177
} catch (CancellationException ignore) {178
} catch (ExecutionException ignore) {179
// 标志所有任务执行完成183
// 返回结果185
} finally {187
// 假如没有完成所有任务(可能是发生异常等情况),将任务取消188
if (!done)189
for (Future&T& f : futures)190
f.cancel(true);191
// 超时设置的invokeAll方法194
public &T& List&Future&T&& invokeAll(Collection&? extends Callable&T&& tasks,195
long timeout, TimeUnit unit)196
throws InterruptedException {197
// 需要执行的任务集为空或时间单位为空,抛出异常198
if (tasks == null || unit == null)199
throw new NullPointerException();200
// 将超时时间转为纳秒单位201
long nanos = unit.toNanos(timeout);202
// 创建任务结果集203
List&Future&T&& futures = new ArrayList&Future&T&&(tasks.size());204
// 是否全部完成的标志205
boolean done =206
// 遍历tasks,将任务转为RunnableFuture208
for (Callable&T& t : tasks)209
futures.add(newTaskFor(t));210
// 记录当前时间(单位是纳秒)211
long lastTime = System.nanoTime();212
// 获取迭代器213
Iterator&Future&T&& it = futures.iterator();214
// 遍历215
while (it.hasNext()) {216
// 执行任务217
execute((Runnable)(it.next()));218
// 记录当前时间219
long now = System.nanoTime();220
// 计算剩余可用时间221
nanos -= now - lastT222
// 更新上一次执行时间223
lastTime =224
// 超时,返回保存任务状态的结果集225
if (nanos &= 0)226
for (Future&T& f : futures) {230
// 如果有任务没完成231
if (!f.isDone()) {232
// 时间已经用完,返回保存任务状态的结果集233
if (nanos &= 0)234
// 获取计算结果,最多等待给定的时间nanos,单位是纳秒237
f.get(nanos, TimeUnit.NANOSECONDS);238
} catch (CancellationException ignore) {239
} catch (ExecutionException ignore) {240
} catch (TimeoutException toe) {241
// 计算可用时间244
long now = System.nanoTime();245
nanos -= now - lastT246
lastTime =247
// 修改是否全部完成的标记250
// 返回结果集252
} finally {254
// 假如没有完成所有任务(可能是时间已经用完、发生异常等情况),将任务取消255
if (!done)256
for (Future&T& f : futures)257
f.cancel(true);258
}260 }& & AbstractExecutor实现了ExecutorService接口的部分方法。具体代码的分析在上面已经给出。& &&AbstractExecutor有两个子类:DelegatedExecutorService、ThreadPoolExecutor。将在后面介绍。& & 下面是AbstractExecutor中涉及到的RunnableFuture、FutureTask、ExecutorCompletionService。& &&RunnableFuture继承自Future和Runnable,只有一个run()方法(Runnable中已经有一个run方法了,为什么RunnableFuture还要重新写一个run方法呢?求高手指教)。RunnableFuture接口看上去就像是Future和Runnable两个接口的组合。& & FutureTask实现了RunnableFuture接口,除了实现了Future和Runnable中的方法外,它还有自己的方法和一个内部类Sync。& &&ExecutorCompletionService实现了CompletionService接口,将结果从复杂的一部分物种解耦出来。这些内容后续会介绍,不过这里先介绍框架中的其它内容,弄清整体框架。& & 下面看继承自AbstractExecutorService的ThreadPoolExecutor。& &&ThreadPoolExecutorThreadPoolExecutor(好长)
1 public class ThreadPoolExecutor extends AbstractExecutorService {
// 检查关闭的权限
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/* runState提供了主要的生命周期控制,可取值有以下几个:
* RUNNING:接受新的任务,处理队列中的任务
* SHUTDOWN:不再接受新的任务,但是处理队列中的任务
* STOP:不接受新任务,也不处理队列中的任务,打断正在处理的任务
* TERMINATED:和STOP类似,同时终止所有线程 10
* RUNNING -& SHUTDOWN 11
On invocation of shutdown(), perhaps implicitly in finalize() 12
* (RUNNING or SHUTDOWN) -& STOP 13
On invocation of shutdownNow() 14
* SHUTDOWN -& TERMINATED 15
When both queue and pool are empty 16
* STOP -& TERMINATED 17
When pool is empty 18
volatile int runS 21
static final int RUNNING
static final int SHUTDOWN
static final int STOP
static final int TERMINATED = 3; 25
// 用于保持任务的队列 27
private final BlockingQueue&Runnable& workQ 28
// poolSize, corePoolSize, maximumPoolSize, runState, workers set的更新锁 29
private final ReentrantLock mainLock = new ReentrantLock(); 30
// mainLock锁的一个Condition实例 31
private final Condition termination = mainLock.newCondition(); 32
// 保持线程池中所有的工作线程。只有获取mainLock锁后才能访问。 33
private final HashSet&Worker& workers = new HashSet&Worker&(); 34
// 空闲线程的等待时间,大为是纳秒 35
private volatile long
keepAliveT 36
// 是否允许核心线程&活着& false(默认值)允许,哪怕空闲;true则使用keepAliveTime来控制等待超时时间 37
private volatile boolean allowCoreThreadTimeO 38
// 核心线程池的大小 39
private volatile int
corePoolS 40
// pool size最大值 41
private volatile int
maximumPoolS 42
// 当前pool大小 43
private volatile int
// 拒绝执行的处理器 顾名思义,当一个任务被拒绝执行后将同个这个处理器进行处理 45
private volatile RejectedExecutionH 46
// 线程工厂,用于创建线程 47
private volatile ThreadFactory threadF 48
// 最终pool size达到的最大值 49
private int largestPoolS 50
// 已完成任务计数 51
private long completedTaskC 52
// 默认的拒绝执行的处理器 53
private static final RejectedExecutionHandler defaultHandler = 54
new AbortPolicy(); 55
* 关于借个size的说明: 57
* 线程池数量poolSize指工作线程Worker对象的集合workers的实际大小,通过workers.size()可直接获得。
* 核心线程池数量corePoolSize,可理解为工作线程Worker对象的集合workers的目标大小。 59
* 如果poolSize & corePoolSize,那么ThreadPoolExecutor就会有机制在适当的时候回收闲置的线程。 60
* 最大线程池数量maxPoolSize,就是工作线程Worker对象的集合workers的大小上限。 61
* 假如说任务队列满了,再来新任务时,若poolSize还没达到maxPoolSize,则继续创建新的线程来执行新任务, 62
* 若不幸poolSize达到了上限maxPoolSize,那不能再创建新的线程了,只能采取reject策略来拒绝新任务。 63
/** 构造方法 开始*/ 65
public ThreadPoolExecutor(int corePoolSize, 66
int maximumPoolSize, 67
long keepAliveTime, 68
TimeUnit unit, 69
BlockingQueue&Runnable& workQueue) { 70
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 71
Executors.defaultThreadFactory(), defaultHandler); 72
public ThreadPoolExecutor(int corePoolSize, 74
int maximumPoolSize, 75
long keepAliveTime, 76
TimeUnit unit, 77
BlockingQueue&Runnable& workQueue, 78
ThreadFactory threadFactory) { 79
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 80
threadFactory, defaultHandler); 81
public ThreadPoolExecutor(int corePoolSize, 83
int maximumPoolSize, 84
long keepAliveTime, 85
TimeUnit unit, 86
BlockingQueue&Runnable& workQueue, 87
RejectedExecutionHandler handler) { 88
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 89
Executors.defaultThreadFactory(), handler); 90
// 主要的构造方法,其它构造方法都是对这个方法的调用 92
public ThreadPoolExecutor(int corePoolSize, 93
int maximumPoolSize, 94
long keepAliveTime, 95
TimeUnit unit, 96
BlockingQueue&Runnable& workQueue, 97
ThreadFactory threadFactory, 98
RejectedExecutionHandler handler) { 99
// 非法输入(明显这些值都是不能小于0的)100
if (corePoolSize & 0 ||101
maximumPoolSize &= 0 ||102
maximumPoolSize & corePoolSize ||103
keepAliveTime & 0)104
throw new IllegalArgumentException();105
// 空判断106
if (workQueue == null || threadFactory == null || handler == null)107
throw new NullPointerException();108
this.corePoolSize = corePoolS109
this.maximumPoolSize = maximumPoolS110
this.workQueue = workQ111
this.keepAliveTime = unit.toNanos(keepAliveTime);112
this.threadFactory = threadF113
this.handler =114
/** 构造方法 结束*/116
// 执行Runnable任务119
public void execute(Runnable command) {120
if (command == null)121
throw new NullPointerException();122
/*如果当前线程数量poolSize&=核心线程数量corePoolSize,123
那当然无法再把当前任务加入到核心线程池中执行了,于是进花括号选择其他的策略执行;124
如果poolSize没有达到corePoolSize,那很自然是把当前任务放到核心线程池执行,125
也就是执行逻辑或运算符后的方法addIfUnderCorePoolSize(command)。126
&放到核心线程池执行&是什么意思呢?127
就是new 一个新工作线程放到workers集合中,让这个新线程来执行当前的任务command,而这个新线程可以认为是核心线程池中的其中一个线程。*/128
if (poolSize &= corePoolSize || !addIfUnderCorePoolSize(command)) {129
// 线程池状态时RUNNING且能将任务添加到worker队列中130
if (runState == RUNNING && workQueue.offer(command)) {131
// 加入了队列以后,只要保证有工作线程就ok了,工作线程会自动去执行任务队列的。132
// 所以判断一下if ( runState != RUNNING || poolSize == 0),133
// 在这个if为true时候,去保证一下任务队列有线程会执行,即执行ensureQueuedTaskHandled(command)方法。134
// 这里有两种情况,情况一:runState != RUNNING,这种情况在ensureQueuedTaskHandled方法中会把任务丢给reject拒绝策略处理,135
// 情况二:poolSize == 0,这种情况是new一个新线程加入到工作线程集合workers中。136
if (runState != RUNNING || poolSize == 0)137
ensureQueuedTaskHandled(command);138
// 程序执行到这个分支,说明当前状态runState != RUNNING,或者任务队列workQueue已经满了。140
// 先看第一个条件下,前面解释过runState,除了RUNNING状态,其他三个状态都不能接收新任务,141
// 所以当runState != RUNNING时新任务只能根据reject策略拒绝,142
// 而这个拒绝的逻辑是在addIfUnderMaximumPoolSize方法中实现的;143
// 再看第二个条件下,workQueue已经满,潜在的条件是runState == RUNNING,这种情况怎么处理新任务呢?144
// 很简单,若当前线程数量poolSize没有达到最大线程数量maxPoolSize,145
// 则创建新的线程去执行这个无法加入任务队列的新任务,146
// 否则就根据reject策略拒绝147
else if (!addIfUnderMaximumPoolSize(command))148
reject(command); // is shutdown or saturated149
private Thread addThread(Runnable firstTask) {153
Worker w = new Worker(firstTask);154
// 创建一个新Thread t155
Thread t = threadFactory.newThread(w);156
if (t != null) {157
w.thread =158
workers.add(w);159
int nt = ++poolS160
// 跟踪线程池大小的最大值161
if (nt & largestPoolSize)162
largestPoolSize =163
// 创建并启动新线程执行firstTask(在运行线程数小于核心线程池大小的情况且状态是RUNNING)168
private boolean addIfUnderCorePoolSize(Runnable firstTask) {169
Thread t =170
final ReentrantLock mainLock = this.mainL171
// 获取锁172
mainLock.lock();173
if (poolSize & corePoolSize && runState == RUNNING)175
// 创建一个新线程176
t = addThread(firstTask);177
} finally {178
// 释放锁179
mainLock.unlock();180
if (t == null)182
// 启动线程执行任务184
t.start();185186
// 创建并启动新线程执行firstTask(在运行线程数小于pool size的最大值的情况且状态是RUNNING)189
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {190
Thread t =191
final ReentrantLock mainLock = this.mainL192
mainLock.lock();193
if (poolSize & maximumPoolSize && runState == RUNNING)195
t = addThread(firstTask);196
} finally {197
mainLock.unlock();198
if (t == null)200
t.start();202203
// 确保任务被处理206
private void ensureQueuedTaskHandled(Runnable command) {207
final ReentrantLock mainLock = this.mainL208
mainLock.lock();209
// 拒绝标记210
boolean reject =211
Thread t =212
int state = runS214
// 如果状态不是RUNNING,能成功从worker队列中移除,则拒绝这个任务执行215
if (state != RUNNING && workQueue.remove(command))216
reject =217
else if (state & STOP &&218
poolSize & Math.max(corePoolSize, 1) &&219
!workQueue.isEmpty())220
t = addThread(null);221
} finally {222
mainLock.unlock();223
if (reject)225
reject(command);226
else if (t != null)227
// 不用拒绝任务则启动线程执行任务228
t.start();229
// 调用RejectedExecutionHandler决绝任务232
void reject(Runnable command) {233
handler.rejectedExecution(command, this);234
// 工作线程,实现了Runnable接口236
private final class Worker implements Runnable {237
// 每个任务执行都必须获取和释放runLock。这主要是防止中断的目的是取消工作线程,而不是中断正在运行的任务。238
private final ReentrantLock runLock = new ReentrantLock();239
// 要执行的任务240
private Runnable firstT241
// 每个线程完成任务的计数器,最后会统计到completedTaskCount中242
volatile long completedT243
// 用于执行Runnable的线程244
// 构造方法246
Worker(Runnable firstTask) {247
this.firstTask = firstT248
// 判断这个线程是否活动250
boolean isActive() {251
return runLock.isLocked();252
// 中断闲置线程254
void interruptIfIdle() {255
final ReentrantLock runLock = this.runL256
if (runLock.tryLock()) {257
if (thread != Thread.currentThread())259
thread.interrupt();260
} finally {261
runLock.unlock();262
// 中断266
void interruptNow() {267
thread.interrupt();268
private void runTask(Runnable task) {272
final ReentrantLock runLock = this.runL273
runLock.lock();274
if (runState & STOP &&277
Thread.interrupted() &&278
runState &= STOP)279
thread.interrupt();280
boolean ran =282
beforeExecute(thread, task);283
task.run();285
afterExecute(task, null);287
++completedT288
} catch (RuntimeException ex) {289
if (!ran)290
afterExecute(task, ex);291
} finally {294
runLock.unlock();295
public void run() {300
Runnable task = firstT302
firstTask =303
* 注意这段while循环的执行逻辑,每执行完一个核心线程后,就会去线程池 305
* 队列中取下一个核心线程,如取出的核心线程为null,则当前工作线程终止 306
while (task != null || (task = getTask()) != null) {308
//你所提交的核心线程(任务)的运行逻辑
runTask(task);310
} finally {313
// 当前工作线程退出
workerDone(this);315
// 从池队列中取的核心线程(任务)的方法320
Runnable getTask() {321
for (;;) {322
// 获取运行状态324
int state = runS325
// 大于SHUTDOWN,即STOP和TERMINATED状态,没有任务326
if (state & SHUTDOWN)327
// SHUTDOWN状态330
if (state == SHUTDOWN)
// 帮助清空队列331
r = workQueue.poll();332
// 状态时RUNNING,且poolSize & corePoolSize或allowCoreThreadTimeOut333
else if (poolSize & corePoolSize || allowCoreThreadTimeOut)334
// 获取并移除元素(等待指定的时间)335
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);336
// 获取并移除元素(会一直等待知道获取到有效元素)338
r = workQueue.take();339
// 获取结果不为空,返回340
if (r != null)341
// 检查一个获取任务失败的线程能否退出343
if (workerCanExit()) {344
if (runState &= SHUTDOWN) // 中断其他线程345
interruptIdleWorkers();346
// Else retry349
} catch (InterruptedException ie) {350
// On interruption, re-check runState351
// 检查一个获取任务失败的线程能否退出356
private boolean workerCanExit() {357
final ReentrantLock mainLock = this.mainL358
mainLock.lock();359
boolean canE360
// 可以退出的条件是状态为STOP或TERMINATED或至少有一个处理非空队列的线程(在允许超时的情况下)362
canExit = runState &= STOP ||363
workQueue.isEmpty() ||364
(allowCoreThreadTimeOut &&365
poolSize & Math.max(1, corePoolSize));366
} finally {367
mainLock.unlock();368
return canE370
// 中断其他线程373
void interruptIdleWorkers() {374
final ReentrantLock mainLock = this.mainL375
mainLock.lock();376
// 遍历工作线程378
for (Worker w : workers)379
// 尝试中断闲置线程380
w.interruptIfIdle();381
} finally {382
mainLock.unlock();383
// 工作线程退出要处理的逻辑 386
void workerDone(Worker w) {387
final ReentrantLock mainLock = this.mainL388
mainLock.lock();389
completedTaskCount += w.completedT391
workers.remove(w);//从工作线程缓存中删除
if (--poolSize == 0)//poolSize减一,这时其实又可以创建工作线程了
tryTerminate();//尝试终止
} finally {395
mainLock.unlock();396
// 尝试终止400
private void tryTerminate() {401
//终止的前提条件就是线程池里已经没有工作线程(Worker)了
if (poolSize == 0) {403
int state = runS404
* 如果当前已经没有了工作线程(Worker),但是线程队列里还有等待的线程任务,则创建一个 406
* 工作线程来执行线程队列中等待的任务 407
if (state & STOP && !workQueue.isEmpty()) {409
state = RUNNING; // disable termination check below410
Thread t = addThread(null);411
if (t != null)412
t.start();413
// 设置池状态为终止状态
if (state == STOP || state == SHUTDOWN) {416
runState = TERMINATED;417
termination.signalAll();418
terminated();419
// 发起一个有序的关闭在以前已提交任务的执行,但不接受新任务。如果已经关闭,调用不会有其他影响。423
public void shutdown() {424
// Gets the system security interface.425
SecurityManager security = System.getSecurityManager();426
if (security != null)427
// 检查权限(以抛出异常的形式)428
security.checkPermission(shutdownPerm);429
final ReentrantLock mainLock = this.mainL430
mainLock.lock();431
if (security != null) { // 检查调用者是否能修改线程433
for (Worker w : workers)434
security.checkAccess(w.thread);435
// 获取运行状态437
int state = runS438
// 小于SHUTDOWN的不就是RUNNING么。。。439
if (state & SHUTDOWN)440
runState = SHUTDOWN;441 442
for (Worker w : workers) {444
// 中断线程445
w.interruptIfIdle();446
} catch (SecurityException se) { // Try to back out448
runState =449
// tryTerminate() here would be a no-op450
// 尝试终止453
tryTerminate(); // Terminate now if pool and queue empty454
} finally {455
mainLock.unlock();456
public List&Runnable& shutdownNow() {461
SecurityManager security = System.getSecurityManager();462
if (security != null)463
security.checkPermission(shutdownPerm);464 465
final ReentrantLock mainLock = this.mainL466
mainLock.lock();467
if (security != null) { // Check if caller can modify our threads469
for (Worker w : workers)470
security.checkAccess(w.thread);471
int state = runS474
// 与上一个方法主要区别在于状态和interruptNow方法475
if (state & STOP)476
runState = STOP;477 478
for (Worker w : workers) {480
w.interruptNow();481
} catch (SecurityException se) { // Try to back out483
runState =484
// tryTerminate() here would be a no-op485
List&Runnable& tasks = drainQueue();489
tryTerminate(); // Terminate now if pool and queue empty490
} finally {492
mainLock.unlock();493
// 清空队列497
private List&Runnable& drainQueue() {498
List&Runnable& taskList = new ArrayList&Runnable&();499
// 将队列中的所有元素一到taskList中500
workQueue.drainTo(taskList);501
while (!workQueue.isEmpty()) {502
Iterator&Runnable& it = workQueue.iterator();503
if (it.hasNext()) {505
Runnable r = it.next();506
// 从workQueue中移除,并添加到taskList中507
if (workQueue.remove(r))508
taskList.add(r);509
} catch (ConcurrentModificationException ignore) {511
return taskL514
public boolean isShutdown() {517
return runState != RUNNING;518
boolean isStopped() {522
return runState == STOP;523
public boolean isTerminating() {527
int state = runS528
return state == SHUTDOWN || state == STOP;529
public boolean isTerminated() {532
return runState == TERMINATED;533
public boolean awaitTermination(long timeout, TimeUnit unit)536
throws InterruptedException {537
long nanos = unit.toNanos(timeout);538
final ReentrantLock mainLock = this.mainL539
mainLock.lock();540
for (;;) {542
if (runState == TERMINATED)543
if (nanos &= 0)545
nanos = termination.awaitNanos(nanos);547
} finally {549
mainLock.unlock();550
protected void finalize()
shutdown();556
public void setThreadFactory(ThreadFactory threadFactory) {560
if (threadFactory == null)561
throw new NullPointerException();562
this.threadFactory = threadF563
public ThreadFactory getThreadFactory() {567
return threadF568
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {572
if (handler == null)573
throw new NullPointerException();574
this.handler =575
public RejectedExecutionHandler getRejectedExecutionHandler() {579
// 设置核心线程数 这里的设置将覆盖构造方法中的设置583
// 如果小于构造方法的设置,多余的线程将被闲置584
// 如果大于构造方法的设置,新线程将被用于执行排队的任务585
public void setCorePoolSize(int corePoolSize) {586
if (corePoolSize & 0)587
throw new IllegalArgumentException();588
final ReentrantLock mainLock = this.mainL589
mainLock.lock();590
int extra = this.corePoolSize - corePoolS592
this.corePoolSize = corePoolS593
// 大于构造方法的设置594
if (extra & 0) {595
int n = workQueue.size(); 596
while (extra++ & 0 && n-- & 0 && poolSize & corePoolSize) {597
Thread t = addThread(null);598
if (t != null)599
t.start();600
// 小于构造方法的设置605
else if (extra & 0 && poolSize & corePoolSize) {606
Iterator&Worker& it = workers.iterator();608
while (it.hasNext() &&609
extra-- & 0 &&610
poolSize & corePoolSize &&611
workQueue.remainingCapacity() == 0)612
it.next().interruptIfIdle();613
} catch (SecurityException ignore) {614
// N it is OK if the threads stay live615
} finally {618
mainLock.unlock();619
public int getCorePoolSize() {624
return corePoolS625
public boolean prestartCoreThread() {629
return addIfUnderCorePoolSize(null);630
public int prestartAllCoreThreads() {634
int n = 0;635
while (addIfUnderCorePoolSize(null))636
++n;637638
public boolean allowsCoreThreadTimeOut() {642
return allowCoreThreadTimeO643
public void allowCoreThreadTimeOut(boolean value) {647
if (value && keepAliveTime &= 0)648
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");649 650
allowCoreThreadTimeOut =651
// 设置所允许的最大的线程数。这将覆盖在构造函数中设置的任何值。如果新值小于当前值,多余的现有线程将被终止时,他们成为闲置。654
public void setMaximumPoolSize(int maximumPoolSize) {655
if (maximumPoolSize &= 0 || maximumPoolSize & corePoolSize)656
throw new IllegalArgumentException();657
final ReentrantLock mainLock = this.mainL658
mainLock.lock();659
int extra = this.maximumPoolSize - maximumPoolS661
this.maximumPoolSize = maximumPoolS662
if (extra & 0 && poolSize & maximumPoolSize) {663
Iterator&Worker& it = workers.iterator();665
while (it.hasNext() &&666
extra & 0 &&667
poolSize & maximumPoolSize) {668
it.next().interruptIfIdle();669
} catch (SecurityException ignore) {672
// N it is OK if the threads stay live673
} finally {676
mainLock.unlock();677
public int getMaximumPoolSize() {682
return maximumPoolS683
public void setKeepAliveTime(long time, TimeUnit unit) {687
if (time & 0)688
throw new IllegalArgumentException();689
if (time == 0 && allowsCoreThreadTimeOut())690
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");691
this.keepAliveTime = unit.toNanos(time);692
public long getKeepAliveTime(TimeUnit unit) {696
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);697
public BlockingQueue&Runnable& getQueue() {701
return workQ702
public boolean remove(Runnable task) {706
return getQueue().remove(task);707
// 移除所有被取消的任务710
public void purge() {711
// Fail if we encounter interference during traversal712
Iterator&Runnable& it = getQueue().iterator();714
while (it.hasNext()) {715
Runnable r = it.next();716
if (r instanceof Future&?&) {717
Future&?& c = (Future&?&)r;718
if (c.isCancelled())719
it.remove();720
catch (ConcurrentModificationException ex) {724725
public int getPoolSize() {730
return poolS731
// 获取活跃线程数734
public int getActiveCount() {735
final ReentrantLock mainLock = this.mainL736
mainLock.lock();737
int n = 0;739
for (Worker w : workers) {740
if (w.isActive())741
} finally {745
mainLock.unlock();746
public int getLargestPoolSize() {751
final ReentrantLock mainLock = this.mainL752
mainLock.lock();753
return largestPoolS755
} finally {756
mainLock.unlock();757
public long getTaskCount() {762
final ReentrantLock mainLock = this.mainL763
mainLock.lock();764
long n = completedTaskC766
for (Worker w : workers) {767
// 统计已经完成的任务768
n += w.completedT769
// 如果w是活跃线程,说明正在执行一个任务,所以n加一770
if (w.isActive())771
// 加上队列中的任务774
return n + workQueue.size();775
} finally {776
mainLock.unlock();777
// 获取已完成的任务数781
public long getCompletedTaskCount() {782
final ReentrantLock mainLock = this.mainL783
mainLock.lock();784
long n = completedTaskC786
for (Worker w : workers)787
n += w.completedT788 789
} finally {790
mainLock.unlock();791
protected void beforeExecute(Thread t, Runnable r) { }796 797
protected void afterExecute(Runnable r, Throwable t) { }799 800
protected void terminated() { }802 803
// 实现了RejectedExecutionHandler,即是一个拒绝执行的Handler804
public static class CallerRunsPolicy implements RejectedExecutionHandler {805
public CallerRunsPolicy() { }807 808
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {810
if (!e.isShutdown()) {811
r.run();812
public static class AbortPolicy implements RejectedExecutionHandler {818
public AbortPolicy() { }820 821
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {823
throw new RejectedExecutionException();824
public static class DiscardPolicy implements RejectedExecutionHandler {829
public DiscardPolicy() { }831 832
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {834
public static class DiscardOldestPolicy implements RejectedExecutionHandler {839
public DiscardOldestPolicy() { }841 842
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {844
if (!e.isShutdown()) {845
e.getQueue().poll();846
e.execute(r);847
}850 }& & 可以参考http://xtu-/blog/647744& & 从上面的框架结构图中可以可以看出剩下的就是ScheduledThreadPoolExecutor和Executors。Executors是一个工具类,提供一些工厂和实用方法。& & 下面看ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor并实现了ScheduledExecutorService接口。& &&ScheduledThreadPoolExecutorScheduledThreadPoolExecutor// 可以安排指定时间或周期性的执行任务的ExecutorServicepublic class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
// 在Shutdown的时候如果要取消或关闭任务,设置为false;true表示继续执行任务,在Shutdown之后
private volatile boolean continueExistingPeriodicTasksAfterS
// false表示在Shutdown的时候取消Delayed的任务;true表示执行这个任务
private volatile boolean executeExistingDelayedTasksAfterShutdown =
// 打破调度联系,进而保证先进先出的顺序捆绑项目中的序列号
private static final AtomicLong sequencer = new AtomicLong(0);
// 基准时间
private static final long NANO_ORIGIN = System.nanoTime();
// 当前时间(相对于基准时间的值)
final long now() {
return System.nanoTime() - NANO_ORIGIN;
// RunnableScheduledFuture接口表示是否是周期性的
private class ScheduledFutureTask&V&
extends FutureTask&V& implements RunnableScheduledFuture&V& {
private final long sequenceN
// 预定安排执行的时刻
// 表示重复任务,0表示不重复,正数表示固定比率,负数表示固定延时
// 构造方法,构造一个只执行一次的任务
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time =
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
// 构造方法,构造一个按指定ns开始执行,指定period周期性执行
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time =
this.period =
this.sequenceNumber = sequencer.getAndIncrement();
// 构造方法,构造一个延时执行的任务
ScheduledFutureTask(Callable&V& callable, long ns) {
super(callable);
this.time =
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
// 按指定单位获取延时时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
// 判断传入延时和这个任务延时之间的大小关系
public int compareTo(Delayed other) {
// 为什么可以和Delayed比较?因为这个类实现了RunnableScheduledFuture接口,而RunnableScheduledFuture接口继承自Delayed接口
if (other == this) // compare zero ONLY if same object
// other是ScheduledFutureTask实例
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask&?& x = (ScheduledFutureTask&?&)
long diff = time - x.
// 比较大小
if (diff & 0)
return -1;
else if (diff & 0)
else if (sequenceNumber & x.sequenceNumber)
return -1;
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0)? 0 : ((d & 0)? -1 : 1);
// 是否周期性的(包括延时的情况)
public boolean isPeriodic() {
return period != 0;
// 执行周期性的任务
private void runPeriodic() {
// 执行任务
boolean ok = ScheduledFutureTask.super.runAndReset();
// 判断是否已经shutdown
boolean down = isShutdown();
// 重新安排任务(如果没有shutdown或在没有关闭且允许在shutdown之后执行已存在的任务)
if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isStopped()))) {
if (p & 0)
// 计算下一次执行的时间
// 计算触发时间
time = triggerTime(-p);
// 将任务添加到队列中
ScheduledThreadPoolExecutor.super.getQueue().add(this);
else if (down)
interruptIdleWorkers();
// 执行任务,根据是否周期性调用不同的方法
public void run() {
if (isPeriodic())
runPeriodic();
ScheduledFutureTask.super.run();
// 延迟执行
private void delayedExecute(Runnable command) {
// 如果已经shutdown,决绝任务
if (isShutdown()) {
reject(command);
if (getPoolSize() & getCorePoolSize())
// 预启动线程
prestartCoreThread();
super.getQueue().add(command);
// 取消和清除关闭政策不允许运行的任务
private void cancelUnwantedTasks() {
// 获取shutdown策略
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic)
super.getQueue().clear();
else if (keepDelayed || keepPeriodic) {
Object[] entries = super.getQueue().toArray();
for (int i = 0; i & entries. ++i) {
Object e = entries[i];
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture&?& t = (RunnableScheduledFuture&?&)e;
// 根据是否周期性的任务通过制定的值判断进行取消操作
if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
t.cancel(false);
// 净化,移除已经取消的任务
public boolean remove(Runnable task) {
if (!(task instanceof RunnableScheduledFuture))
return getQueue().remove(task);
protected &V& RunnableScheduledFuture&V& decorateTask(
Runnable runnable, RunnableScheduledFuture&V& task) {
protected &V& RunnableScheduledFuture&V& decorateTask(
Callable&V& callable, RunnableScheduledFuture&V& task) {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), handler);
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay & 0) ? 0 : delay));
long triggerTime(long delay) {
return now() +
((delay & (Long.MAX_VALUE && 1)) ? delay : overflowFree(delay));
// 避免移除,返回延迟的值
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
if (headDelay & 0 && (delay - headDelay & 0))
delay = Long.MAX_VALUE + headD
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
// 根据执行的延时时间执行任务
public ScheduledFuture&?& schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// ScheduledFutureTask的result为null
RunnableScheduledFuture&?& t = decorateTask(command,
new ScheduledFutureTask&Void&(command, null,
triggerTime(delay, unit)));
// 延时执行
delayedExecute(t);
// 上一个方法的重载形式,接收的是Callable
public &V& ScheduledFuture&V& schedule(Callable&V& callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture&V& t = decorateTask(callable,
new ScheduledFutureTask&V&(callable,
triggerTime(delay, unit)));
delayedExecute(t);
* 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit
* 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...
public ScheduledFuture&?& scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period &= 0)
throw new IllegalArgumentException();
RunnableScheduledFuture&?& t = decorateTask(command,
new ScheduledFutureTask&Object&(command,
triggerTime(initialDelay, unit),
unit.toNanos(period)));
delayedExecute(t);
* 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit
* 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...
public ScheduledFuture&?& scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay &= 0)
throw new IllegalArgumentException();
RunnableScheduledFuture&?& t = decorateTask(command,
new ScheduledFutureTask&Boolean&(command,
triggerTime(initialDelay, unit),
unit.toNanos(-delay)));
delayedExecute(t);
// 执行任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 立即执行,延时时间是0
schedule(command, 0, TimeUnit.NANOSECONDS);
// 重新 AbstractExecutorService 的方法
public Future&?& submit(Runnable task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
public &T& Future&T& submit(Runnable task, T result) {
return schedule(Executors.callable(task, result),
0, TimeUnit.NANOSECONDS);
public &T& Future&T& submit(Callable&T& task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
continueExistingPeriodicTasksAfterShutdown =
if (!value && isShutdown())
cancelUnwantedTasks();
public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
return continueExistingPeriodicTasksAfterS
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
executeExistingDelayedTasksAfterShutdown =
if (!value && isShutdown())
cancelUnwantedTasks();
public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
return executeExistingDelayedTasksAfterS
public void shutdown() {
// 取消任务
cancelUnwantedTasks();
super.shutdown();
// 立即关闭,调用的是父类立即关闭的方法
public List&Runnable& shutdownNow() {
return super.shutdownNow();
// 返回使用这个执行器的任务队列
public BlockingQueue&Runnable& getQueue() {
return super.getQueue();
// 将DelayQueue&RunnableScheduledFuture& 包装为 BlockingQueue&Runnable&的类
// 类似于代理
private static class DelayedWorkQueue
extends AbstractCollection&Runnable&
implements BlockingQueue&Runnable& {
private final DelayQueue&RunnableScheduledFuture& dq = new DelayQueue&RunnableScheduledFuture&();
public Runnable poll() { return dq.poll(); }
public Runnable peek() { return dq.peek(); }
public Runnable take() throws InterruptedException { return dq.take(); }
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
return dq.poll(timeout, unit);
public boolean add(Runnable x) {
return dq.add((RunnableScheduledFuture)x);
public boolean offer(Runnable x) {
return dq.offer((RunnableScheduledFuture)x);
public void put(Runnable x) {
dq.put((RunnableScheduledFuture)x);
public boolean offer(Runnable x, long timeout, TimeUnit unit) {
return dq.offer((RunnableScheduledFuture)x, timeout, unit);
public Runnable remove() { return dq.remove(); }
public Runnable element() { return dq.element(); }
public void clear() { dq.clear(); }
public int drainTo(Collection&? super Runnable& c) { return dq.drainTo(c); }
public int drainTo(Collection&? super Runnable& c, int maxElements) {
return dq.drainTo(c, maxElements);
public int remainingCapacity() { return dq.remainingCapacity(); }
public boolean remove(Object x) { return dq.remove(x); }
public boolean contains(Object x) { return dq.contains(x); }
public int size() { return dq.size(); }
public boolean isEmpty() { return dq.isEmpty(); }
public Object[] toArray() { return dq.toArray(); }
public &T& T[] toArray(T[] array) { return dq.toArray(array); }
public Iterator&Runnable& iterator() {
return new Iterator&Runnable&() {
private Iterator&RunnableScheduledFuture& it = dq.iterator();
public boolean hasNext() { return it.hasNext(); }
public Runnable next() { return it.next(); }
public void remove() { it.remove(); }
}}& & 在代码中都加了注释,我想大致能解释清楚吧。& & Executor涉及的类还是比较多的,到此为止剩下的还有Executors& &&Executors& &&Executors中所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。此类支持以下各种方法:创建并返回设置有常用配置字符串的 ExecutorService 的方法。创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。创建并返回&包装的&ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。&& & &Executors提供的都是工具形式的方法,所以都是static的,并且这个类也没有必要实例化,所以它的构造方法时private的。下面主要看一下几个内部类。  &RunnableAdapter 1 static final class RunnableAdapter&T& implements Callable&T& { 2
RunnableAdapter(Runnable
task, T result) { 5
this.task = 6
this.result = 7
public T call() { 9
task.run();10
}12 }& & 适配器。以Callable的形式执行Runnable并且返回给定的result。& & PrivilegedCallable 1 static final class PrivilegedCallable&T& implements Callable&T& { 2
private final AccessControlC 3
private final Callable&T& 4
private T 5
private E 6
PrivilegedCallable(Callable&T& task) { 7
this.task = 8
this.acc = AccessController.getContext(); 9
public T call() throws Exception {12
AccessController.doPrivileged(new PrivilegedAction&T&() {13
public T run() {14
result = task.call();16
} catch (Exception ex) {17
exception =18
}, acc);22
if (exception != null)23
}27 }& & 在访问控制下运行的Callable。涉及到Java.security包中的内容。& &&PrivilegedCallableUsingCurrentClassLoader类与上面的PrivilegedCallable类似,只是使用的是CurrentClassLoader。& &&DefaultThreadFactory 1
static class DefaultThreadFactory implements ThreadFactory { 2
static final AtomicInteger poolNumber = new AtomicInteger(1); 3
final ThreadG 4
final AtomicInteger threadNumber = new AtomicInteger(1); 5
final String nameP 6
DefaultThreadFactory() { 8
SecurityManager s = System.getSecurityManager(); 9
group = (s != null)? s.getThreadGroup() :10
Thread.currentThread().getThreadGroup();11
namePrefix = "pool-" +12
poolNumber.getAndIncrement() +13
"-thread-";14
public Thread newThread(Runnable r) {17
// 调用Thread构造方法创建线程18
Thread t = new Thread(group, r,19
namePrefix + threadNumber.getAndIncrement(),20
// 取消守护线程设置22
if (t.isDaemon())23
t.setDaemon(false);24
// 设置默认优先级25
if (t.getPriority() != Thread.NORM_PRIORITY)26
t.setPriority(Thread.NORM_PRIORITY);27 28
}& & DefaultThreadFactory 是默认的线程工程,提供创建线程的方法。& &&PrivilegedThreadFactory继承自DefaultThreadFactory,区别在于线程执行的run方法指定了classLoader并受到权限的控制。& &&DelegatedExecutorService继承自AbstractExecutorService,是一个包装类,暴露ExecutorService的方法。& &&DelegatedScheduledExecutorService继承自DelegatedExecutorService,实现了ScheduledExecutorService接口。它也是一个包装类,公开ScheduledExecutorService方法。&&
、 、 、 、 、}

我要回帖

更多关于 java nanoseconds 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信