java 线程池 调度策略如果满了 缓冲池也满了 有哪些拒绝策略处理

4412人阅读
java(135)
/blog/1123185
线程池还具有提高系统性能的优点,因为创建线程和清除线程的开销比较大。&
有两种不同类型的线程池:一是固定线程数量的线程池;二是可变数量的线程池。&
对于固定数量的线程池,可以使用Executors的静态方法 newFixedThreadPool 来创建 ExecutorService;或者利用 newSingleThreadPool来创建。&
& & & 而 ExecutorService 实现了 Executor 接口,这个接口中有一个方法:Execute(Runnable command),也就是执行线程。&
& & & 对于固定数量的线程池而言,如果需要执行的线程数量多于构造的数量,那么只能并发构造时的数量,剩下的线程就进入线程池的等待队列。&
& & & 如果不需要使用该线程池了,则使用 ExecutorService 中的 shutDown 方法,此时,该线程池就不会接受执行新的线程任务了。
对于可变数量的线程池,可用Executors的静态方法 newCachedThreadPool 来创建 ExecutorService,该线程池的大小是不定的,当执行任务时,会先选取缓存中的空闲线程来执行,如果没有空闲线程,则创建一个新的线程,而如果空闲线程的空闲状态超过60秒,则线程池删除该线程。
还有一种线程池:延迟线程池&
该线程池的创建有两个方法: Executors.newScheduledThreadPool(int corePoolSize);
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& Executors.newSingleScheduledExecutor();&
创建之后,会获得一个 ScheduledExecutorService。&
该对象的一个重要的方法就是: schedule(Runnable command, long delay, TimeUnit unit)
&该方法返回了一个 ScheduledFuture。
JDK1.5中加入了许多对并发特性的支持,例如:线程池。
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
Java代码&&
ThreadPoolExecutor(int&corePoolSize,&int&maximumPoolSize,long&keepAliveTime,&TimeUnit&unit,BlockingQueue&Runnable&&workQueue,RejectedExecutionHandler&handler)&&
corePoolSize:&&&&&&& 线程池维护线程的最少数量 (core : 核心)
maximumPoolSize:线程池维护线程的最大数量&
keepAliveTime:&&&& 线程池维护线程所允许的空闲时间
unit:&&&&&&&&&& 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler:&&&&& 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。&
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果线程池中运行的线程&小于corePoolSize&,即使线程池中的线程都处于空闲状态,也要&创建新的线程&来处理被添加的任务。
如果线程池中运行的线程大于等于corePoolSize,但是缓冲队列 workQueue未满&,那么任务被放入缓冲队列&。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满(即无法将请求加入队列&),并且线程池中的数量小于maximumPoolSize,建新的线程&来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize&,那么通过&handler&所指定的策略来处理此任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止&。这样,线程池可以动态的调整池中的线程数。
也就是:处理任务的优先级为:
corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。&
当然,如果用的是无界的缓冲队列,那么当线程等于corePoolSIze,小于maximumPoolSize,任务就会不停的添加到队列中,也不会创建新线程,也不会丢弃。
Java代码&&
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:&&
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。&&
workQueue常用的是:&&
&&&&&&&java.util.concurrent.ArrayBlockingQueue&&
handler有四个选择:&&
ThreadPoolExecutor.AbortPolicy() &//默认的处理方式
ThreadPoolExecutor.CallerRunsPolicy()&&
ThreadPoolExecutor.DiscardOldestPolicy()&&
ThreadPoolExecutor.DiscardPolicy() &//推荐的处理方式
一个例子:&
Java代码&&
package&&&
import&java.util.concurrent.ArrayBlockingQ&&
import&java.util.concurrent.ThreadPoolE&&
import&java.util.concurrent.TimeU&&
public&class&TestThreadPool&{&&
&&&&private&static&int&produceTaskSleepTime&=&2;&&
&&&&private&static&int&produceTaskMaxNumber&=&10;&&
&&&&public&static&class&ThreadPoolTask&implements&Runnable{&&
&&&&&&&&&&
&&&&&&&&private&Object&threadPoolTaskD&&
&&&&&&&&ThreadPoolTask(Object&tasks)&{&&
&&&&&&&&&&&&this.threadPoolTaskData&=&&&
&&&&&&&&}&&
&&&&&&&&public&void&run()&{&&
&&&&&&&&&&&&&&
&&&&&&&&&&&&System.out.println(&start&..&&+&threadPoolTaskData);&&
&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&Thread.sleep(2000);&&
&&&&&&&&&&&&}&catch&(Exception&e)&{&&
&&&&&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&&&&&}&&
&&&&&&&&&&&&threadPoolTaskData&=&null;&&
&&&&&&&&}&&
&&&&&&&&public&Object&getTask()&{&&
&&&&&&&&&&&&return&this.threadPoolTaskD&&
&&&&&&&&}&&
&&&&}&&&&&
&&&&public&static&void&main(String[]&args)&{&&
&&&&&&&&&&
&&&&&&&&ThreadPoolExecutor&threadPool&=&new&ThreadPoolExecutor(2,&4,&3,&&
&&&&&&&&&&&&&&&&TimeUnit.SECONDS,&new&ArrayBlockingQueue&Runnable&(3),&&
&&&&&&&&&&&&&&&&new&ThreadPoolExecutor.DiscardOldestPolicy());&&
&&&&&&&&for&(int&i&=&1;&i&&=&produceTaskMaxN&i++)&{&&
&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&String&task&=&&task@&&&+&i;&&
&&&&&&&&&&&&&&&&System.out.println(&put&&&+&task);&&
&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&threadPool.execute(new&ThreadPoolTask(task));&&
&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&Thread.sleep(produceTaskSleepTime);&&
&&&&&&&&&&&&}&catch&(Exception&e)&{&&
&&&&&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&&&&&}&&
&&&&&&&&}&&
1、在这段程序中,一个任务就是一个Runnable类型的对象,也就是一个ThreadPoolTask类型的对象。
2、一般来说任务除了处理方式外,还需要处理的数据,处理的数据通过构造方法传给任务。
3、在这段程序中,main()方法相当于一个残忍的领导,他派发出许多任务,丢给一个叫 threadPool的任劳任怨的小组来做。
4、这个小组里面队员至少有两个,如果他们两个忙不过来,任务就被放到任务列表里面。
如果积压的任务过多,多到任务列表都装不下(超过3个)的时候,就雇佣新的队员来帮忙。但是基于成本的考虑,不能雇佣太多的队员,至多只能雇佣 4个。
5、如果四个队员都在忙时,再有新的任务,这个小组就处理不了了,任务就会被通过一种策略来处理,我们的处理方式是不停的派发,直到接受这个任务为止(更残忍!呵呵)。
因为队员工作是需要成本的,如果工作很闲,闲到 3 秒都没有新的任务了,那么有的队员就会被解雇了,但是,为了小组的正常运转,即使工作再闲,小组的队员也不能少于两个。
本例来源:
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。
线程池的类体系结构:&
ExecutorService: &&& &&& 真正的线程池接口。
ScheduledExecutorService &&& 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。
ThreadPoolExecutor &&& &&& ExecutorService的默认实现。
ScheduledThreadPoolExecutor &&& 继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。
Executors 工厂方法:&
Java代码&&
newSingleThreadExecutor:&&
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。&&
newFixedThreadPool:&&
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。&&
newCachedThreadPool:&&&
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。&&
newScheduledThreadPool:&&
ThreadPoolExecutor是Executors类的底层实现:
ExecutorService newFixedThreadPool(int nThreads) 固定大小线程池
Java代码&&
public&static&ExecutorService&newFixedThreadPool(int&nThreads)&{&&
&&&&&&&&return&new&ThreadPoolExecutor(nThreads,&nThreads,&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&0L,&TimeUnit.MILLISECONDS,&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&new&LinkedBlockingQueue&Runnable&());&&
&corePoolSize和maximumPoolSize的大小是一样的(实际上,如果使用无界queue的话maximumPoolSize参数是没有意义的),BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。
一个例子:
Java代码&&
public&class&ThreadPool&{&&
&&&&private&static&void&doSomething(int&id)&{&&
&&&&&&&&System.out.println(&start&do&&&+&id&+&&&task&…&);&&
&&&&&&&&try&{&&
&&&&&&&&&&&&Thread.sleep(1000&*&2);&&
&&&&&&&&}&catch&(InterruptedException&e)&{&&
&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&}&&
&&&&&&&&System.out.println(&start&do&&&+&id&+&&&finished.&);&&
&&&&public&static&void&main(String[]&args)&{&&
&&&&&&&&ExecutorService&executorService&=&Executors.newFixedThreadPool(2);&&
&&&&&&&&&&&&&&
&&&&&&&&&&
&&&&&&&&executorService.submit(new&Runnable()&{&&
&&&&&&&&&&&&public&void&run()&{&&
&&&&&&&&&&&&&&&&doSomething(1);&&
&&&&&&&&&&&&}&&
&&&&&&&&});&&
&&&&&&&&executorService.execute(new&Runnable()&{&&
&&&&&&&&&&&&public&void&run()&{&&
&&&&&&&&&&&&&&&&doSomething(2);&&
&&&&&&&&&&&&}&&
&&&&&&&&});&&
&&&&&&&&executorService.shutdown();&&
&&&&&&&&System.out.println(&&&main&thread&end.&);&&&
ExecutorService newSingleThreadExecutor() 单线程
Java代码&&
public&static&ExecutorService&newSingleThreadExecutor()&{&&
&&&&&&&&return&new&FinalizableDelegatedExecutorService&&
&&&&&&&&&&&&(new&ThreadPoolExecutor(1,&1,&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&0L,&TimeUnit.MILLISECONDS,&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&new&LinkedBlockingQueue&Runnable&()));&&
&最大和最小都为1
ExecutorService newCachedThreadPool() 无界线程池
Java代码&&
public&static&ExecutorService&newCachedThreadPool()&{&&
&&&&&&&&return&new&ThreadPoolExecutor(0,&Integer.MAX_VALUE,&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&60L,&TimeUnit.SECONDS,&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&new&SynchronousQueue&Runnable&());&&
这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用&SynchronousQueue
。可能对于该BlockingQueue有些陌生,简单说:该QUEUE中,&每个插入操作必须等待另一个
线程的对应移除操作。&比如,我先添加一个元素,接下来如果继续想尝试添加则会阻塞,直到另一个线程取走一个元素,反之亦然。(想到什么?就是缓冲区为1的生产者消费者模式^_^)
以下为重要分析:
到此如果有很多疑问,那是必然了(除非你也很了解了)
先从&&&&&workQueue这个入参开始说起。在JDK中,其实已经说得很清楚了,一共有三种类型的queue。以下为引用:(我会稍微修改一下,并用红色突出显示)
所有&都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?如果当前运行的线程小于corePoolSize&,则任务根本不会存放,添加到queue中&,而是直接&抄家伙(thread)开始运行&)如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列&,而不添加新的线程&。如果无法将请求加入队列,则创建新的线程&,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
先不着急举例子,因为首先需要知道队列的三种类型&。
排队有三种通用策略:
直接提交。&工作队列的默认选项是&,它将任务直接提交给线程而不保持它们&。在此,如果不存在可用于立即运行任务的线程&,则试图把任务加入队列将失败,因此会构造一个新的线程&。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界
maximumPoolSizes 以避免拒绝新提交的任务&。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。无界队列。&使用无界队列(例如,不具有预定义容量的&)将导致在所有
corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize&。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。有界队列。&当使用有限的 maximumPoolSizes 时,有界队列(如&)有助于防止资源耗尽&,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低
CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。 &
到这里,该了解的理论已经够多了,可以调节的就是corePoolSize和maximumPoolSizes 这对参数还有就是BlockingQueue的选择。
例子一:使用直接提交策略,也即SynchronousQueue。
首先SynchronousQueue是无界的,也就是说他存数任务的能力是没有限制的,但是由于该Queue本身的特性&,在某次添加元素后必须等待其他线程取走后才能继续添加&。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。
我们使用一下参数构造ThreadPoolExecutor:
Java代码&&
new&ThreadPoolExecutor(&&&&
&&&&&&&&&&&&2,&3,&30,&TimeUnit.SECONDS,&&&&&
&&&&&&&&&&&&new&&span&style=&white-space:&&&SynchronousQueue&/span&&Runnable&(),&&&&&
&&&&&&&&&&&&new&RecorderThreadFactory(&CookieRecorderPool&),&&&&&
&&&&&&&&&&&&new&ThreadPoolExecutor.CallerRunsPolicy());&&&&
Java代码&&
new&ThreadPoolExecutor(&&
&&&&&&&&&&&&2,&3,&30,&TimeUnit.SECONDS,&&&
&&&&&&&&&&&&new&&span&style=&white-space:&&&SynchronousQueue&/span&&&
&Runnable&(),&&&
&&&&&&&&&&&&new&RecorderThreadFactory(&CookieRecorderPool&),&&&
&&&&&&&&&&&&new&ThreadPoolExecutor.CallerRunsPolicy());&&
&当核心线程已经有2个正在运行.
此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列&,而不添加新的线程&。”,所以A被添加到queue中。又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程&,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。
所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁&。
什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中
例子二:使用无界队列策略,即
这个就拿newFixedThreadPool&来说,根据前文提到的规则:
如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
&那么当任务继续增加,会发生什么呢?
如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
&OK,此时任务变加入队列之中了,那什么时候才会添加新线程呢?
如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
这里就很有意思了,可能会出现无法加入队列吗?不像SynchronousQueue那样有其自身的特点,对于无界队列来说,总是可以加入的(资源耗尽,当然另当别论)。换句说,永远也不会触发产生新的线程!&corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,如果任务内存大一些,不一会儿就爆了,呵呵。
可以仔细想想哈。
例子三:有界队列,使用
这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。
举例来说,请看如下构造方法:
Java代码&&
new&ThreadPoolExecutor(&&&&
&&&&&&&&&&&&2,&4,&30,&TimeUnit.SECONDS,&&&&&
&&&&&&&&&&&&new&ArrayBlockingQueue&Runnable&(2),&&&&&
&&&&&&&&&&&&new&RecorderThreadFactory(&CookieRecorderPool&),&&&&&
&&&&&&&&&&&&new&ThreadPoolExecutor.CallerRunsPolicy());&&&
Java代码&&
new&ThreadPoolExecutor(&&
&&&&&&&&&&&&2,&4,&30,&TimeUnit.SECONDS,&&&
&&&&&&&&&&&&new&ArrayBlockingQueue&Runnable&(2),&&&
&&&&&&&&&&&&new&RecorderThreadFactory(&CookieRecorderPool&),&&&
&&&&&&&&&&&&new&ThreadPoolExecutor.CallerRunsPolicy());&&
假设,所有的任务都永远无法执行完。
对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queu中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。
ThreadPoolExecutor的使用还是很有技巧的。使用无界queue可能会耗尽系统资源。使用有界queue可能不能很好的满足性能,需要调节线程数和queue大小线程数自然也有开销,所以需要根据不同应用进行调节。
通常来说对于静态任务可以归为:
数量大,但是执行时间很短数量小,但是执行时间较长数量又大执行时间又长除了以上特点外,任务间还有些内在关系
看完这篇问文章后,希望能够可以选择合适的类型了
/java-executorservice-%E9%87%8D%E8%A6%81bug%E4%B8%A4%E4%BE%8B.html
static class Task1 implements Runnable {
public Task1(int id) {
public void run() {
System.out.println(Thread.currentThread() +& task & + id + & begin&);
Thread.sleep(5000);
} catch (InterruptedException e) {
if (id & 5) {
throw new RuntimeException(Thread.currentThread() + & task & +id+ & exception&);
static ExecutorService pool = Executors.newFixedThreadPool(5);
public void test() throws Exception {
public static void main(String[] args) throws Exception
for (int i = 0; i & 10; ++i) {
pool.execute(new Task1(i));
ThreadPoolExecutor threads =(ThreadPoolExecutor)
for (int i= 0; i & 40; i++) {
System.out.println(&pool size = & + threads.getPoolSize() + & Largest = & + threads.getLargestPoolSize() + & Max = & + threads.getMaximumPoolSize());
Thread.sleep(1000);
pool.shutdown();
ToyClient client = new ToyClient();
client.test();
}起始线程池里有5个线程,前五个线程都抛出异常,这五个线程因为异常而死掉,线程池会再创建五个新线程
所以建议用submit提交任务,如果异常在线程中发生,当前线程不会处理这个异常,当前线程也不会死掉
如果任务实现的是callable接口,会在future.get();结果返回时抛出异常
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:1321651次
积分:16453
积分:16453
排名:第394名
原创:302篇
转载:687篇
评论:125条
(1)(3)(2)(1)(2)(4)(3)(1)(3)(2)(4)(15)(1)(3)(5)(11)(7)(6)(12)(5)(1)(8)(15)(13)(31)(27)(30)(43)(6)(13)(9)(14)(11)(18)(41)(41)(94)(100)(81)(52)(47)(32)(77)(54)(32)(10)一 使用线程池的好处
合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
二 在何种情况下使用线程池
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
<span style="color:#.&&&&&&&&&&&&&&&&&&&任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
<span style="color:#.&&&&&&&&&&&&&&&&&&&任务的优先级:高,中和低。
<span style="color:#.&&&&&&&&&&&&&&&&&&&任务的执行时间:长,中和短。
<span style="color:#.&&&&&&&&&&&&&&&&&&&任务的依赖性:是否依赖其他系统资源,如数据库连接。
任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu&#43;1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程(对数据库写入操作来说大部分时间都放在了IO上,因此会耗费大量时间在等待提交数据后的结果。此时应该配置肯能多的线程),如<span style="color:#*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。当然,我们也可以写一个监控程序,实时尚未写入数据库并缓存在工作队列中的数据总量,并动态的通过sleep方法来降低写入数据库的压力,具体可以参见多线程批量向数据库中插入数据项目链接:
/s/1eQEkfZG
三线程池原理
我们可以通过ThreadPoolExecutor来创建一个线程池。创建一个线程池需要输入几个参数:
newThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime,milliseconds,runnableTaskQueue, threadFactory,handler);
corePoolSize(上图中核心线程池数量线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
·&&&&&&&&&&&&&&&&&&&&&&&&&&&runnableTaskQueue(上图中的workqueue任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
<span style="color:#.&&&&&&&&&&&&&&&&&&&ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
<span style="color:#.&&&&&&&&&&&&&&&&&&&LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO(先进先出)排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
<span style="color:#.&&&&&&&&&&&&&&&&&&&SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
<span style="color:#.&&&&&&&&&&&&&&&&&&&PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
·&&&&&&&&&&&&&&&&&&&&&&&&&&&maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。&#20540;得注意的是如果使用了无界的任务队列这个参数就没什么效果。
从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:
<span style="color:#.&&&&&&&&&&&&&&&&&&&首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
<span style="color:#.&&&&&&&&&&&&&&&&&&&其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
<span style="color:#.&&&&&&&&&&&&&&&&&&&最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
&举例说明:
&假设coresize设置为<span style="color:#,maximumPoolSize设置为<span style="color:#,工作队列为有界队列workequeque其大小为<span style="color:#,。假如此时线程池中没有任何任务,等待插入执行的任务依次为t1,t2,t3,t4,t5,t6,t7,t8….tn。限制条件为每个任务t执行时间都很长。
由于coresize为<span style="color:#,从t1到t4不需要等待直接创建<span style="color:#个线程开始执行。等到t5进入时候,由于coresize已经满了,但是workqueque还有三个位置空着,则t5进入workqueque,同理t6,t7也进入队列中等待。此时任务t8进入,
此时状态为(<span style="color:#)coresize已满
&&&&&&&&&& (<span style="color:#)worqueue已满
&&&&&&&&&& (<span style="color:#)maximumPoolSize- coresize=2(还有两个位置,可以创建新线程)
&&&&&&&&&& 但是线程池中的线程数尚未达到最大&#20540;<span style="color:#(当前线程池有<span style="color:#个线程正在运行),于是线程池为t8新建一个线程。此时会出现一个情况即虽然t8进入线程池的时间比t5、t6、t7晚,但是t8会先于在workqueue中等待的(t5、t6、t7)执行
&&&& 同样t9过来后状态
此时状态为(<span style="color:#)coresize已满
&&&&&&&&&& (<span style="color:#)worqueue已满
&&&&&&&&&& (<span style="color:#)maximumPoolSize- coresize=2(t8开启了线程,还有<span style="color:#个位置,可以创建新线程)t9也会先执行。
当t10过来时候发现
&&&&& (<span style="color:#)coresize已满
&&&&& (<span style="color:#)worqueue已满
&&&&& (<span style="color:#)maximumPoolSize- coresize=2(t8、t9开启了线程还有<span style="color:#个位置,不可以创建新线程)t10会被线程池拒绝执行。我们可以自己实现RejectedExecutionHandler接口,如下所示。当任务被拒绝执行的时候会自动调用我们的接口。
package com.journaldev.
import java.util.concurrent.RejectedExecutionH
import java.util.concurrent.ThreadPoolE
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + & is rejected&);
·&&&&&&&&&&&&&&&&&&&&&&&&&&&ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。n&
AbortPolicy:直接抛出异常。
<span style="color:#.&&&&&&&&&&&&&&&&&&&CallerRunsPolicy:只用调用者所在线程来运行任务。
<span style="color:#.&&&&&&&&&&&&&&&&&&&DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
<span style="color:#.&&&&&&&&&&&&&&&&&&&DiscardPolicy:不处理,丢弃掉。
<span style="color:#.&&&&&&&&&&&&&&&&&&&当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
·&&&&&&&&&&&&&&&&&&&&&&&&&&&keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
·&&&&&&&&&&&&&&&&&&&&&&&&&&&TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS,千分之一毫秒)和毫微秒(NANOSECONDS,千分之一微秒)。
线程池监控方法
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
·&&&&&&&&&&&&&&&&&&&&&&&&&&&taskCount:线程池需要执行的任务数量(只要被线程池接受的任务都计入在内)。
·&&&&&&&&&&&&&&&&&&&&&&&&&&&completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
·&&&&&&&&&&&&&&&&&&&&&&&&&&&largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
·&&&&&&&&&&&&&&&&&&&&&&&&&&&getPoolSize:线程池的线程数量。即使所有任务完成后,线程还会继续存活一段时间,可以具体参考下TimeUnit(线程活动保持时间的单位)。
·&&&&&&&&&&&&&&&&&&&&&&&&&&&getActiveCount:获取活动的线程数。
四 代码学习实例
& & & & & & & & &
<span style="color:#)等待运行的任务类
package com.journaldev.
public class WorkerThread implements Runnable {
public WorkerThread(String s){
public void run() {
System.out.println(Thread.currentThread().getName()+& Start. Command = &+command);
processCommand();
System.out.println(Thread.currentThread().getName()+& End.&);
private void processCommand() {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
public String toString(){
return (&线程:&+mand);
2)&&&&我们创建一个测试线程程序,创建一个固定大小的线程池从 Executors框架中。
package com.journaldev.
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import java.util.concurrent.TimeU
public class SimpleThreadPool {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i & 10; i++) {
Runnable worker = new WorkerThread(& &+ i);
executor.execute(worker);
System.out.println(&worker &+i+& added&);
//不存在阻塞 前五个任务直接进入线程池开启线程执行 后五项任务加入工作队列等待执行
executor.shutdown();
while(!executor.awaitTermination(1, TimeUnit.SECONDS))
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
/* while (!executor.isTerminated()) {
}//一直循环 直到全部任务完成
System.out.println(&Finished all threads&);
3)下面是一个自定义实现RejectedExecutionHandler 接口的例子:
package com.journaldev.
import java.util.concurrent.RejectedExecutionH
import java.util.concurrent.ThreadPoolE
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + & is rejected&);
4)ThreadPoolExecutor提供了使用它我们可以找出执行者,池大小,活动线程数和任务数的当前状态的几种方法。所以,我有一个监视线程,这将在一定的时间间隔打印的执行人的信息。
package com.journaldev.
import java.util.concurrent.ThreadPoolE
public class MyMonitorThread implements Runnable
private ThreadPoolE
private boolean run=
public MyMonitorThread(ThreadPoolExecutor executor, int delay)
this.executor =
this.seconds=
public void shutdown(){
public void run()
while(run){
System.out.println(
String.format(&[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s&,
this.executor.getPoolSize(),//线程池中保存着的线程数
this.executor.getCorePoolSize(),//线程池基本大小
this.executor.getActiveCount(),//当前活动的线程大小
this.executor.getCompletedTaskCount(),//已经完成的任务数量
this.executor.getTaskCount(),//所有任务总量
this.executor.isShutdown(),//是否已经关闭(不再接受新的任务)
this.executor.isTerminated()));//是否所所有任务完成 只要保存着
Thread.sleep(seconds*1000);
} catch (InterruptedException e) {
e.printStackTrace();
5)这里是实现线程池的例子使用线程池:
package com.journaldev.
import java.util.concurrent.ArrayBlockingQ
import java.util.concurrent.E
import java.util.concurrent.ThreadF
import java.util.concurrent.ThreadPoolE
import java.util.concurrent.TimeU
public class WorkerPool {
public static void main(String args[]) throws InterruptedException{
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//creating the ThreadPoolExecutor
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue&Runnable&(2), threadFactory, rejectionHandler);
//start the monitoring thread
MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
Thread monitorThread = new Thread(monitor);
monitorThread.start();
//submit work to the thread pool
for(int i=0; i&10; i++){
executorPool.execute(new WorkerThread(&cmd&+i));
executorPool.shutdown();//执行此方法会立刻执行,一定要加上此句,否则会认为线程池还会有新的任务
/*shutdown方法的作用:(1)将线程池设置为shutdown状态(2)中断没有正在执行任务的线程(因为线程池中会一直维护着若干个线程处于活动状态,中断没有正
在执行任务的线程意味着关闭空闲线程)(3)线程池不能再接受新的任务,但会一直等待已经添加到工作队列中的任务完成。*/
//不加的话executorPool.awaitTermination(1, TimeUnit.SECONDS)会一直为false
//每隔一秒检测线程池是否完成所有任务 也可以使用 while (!executor.isTerminated())
while(!executorPool.awaitTermination(1, TimeUnit.SECONDS))
catch (InterruptedException e)
{e.printStackTrace();}
&/pre&&pre&执行结果:
pool-1-thread-1 Start. Command = cmd0
pool-1-thread-2 Start. Command = cmd1
线程:cmd6 is rejected
线程:cmd7 is rejected
线程:cmd8 is rejected
pool-1-thread-4 Start. Command = cmd5
pool-1-thread-3 Start. Command = cmd4
线程:cmd9 is rejected
[monitor] [0/2] Active: 2, Completed: 0, Task: 4, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: true, isTerminated: false
pool-1-thread-1 End.
pool-1-thread-2 End.
pool-1-thread-1 Start. Command = cmd2
pool-1-thread-2 Start. Command = cmd3
pool-1-thread-4 End.
pool-1-thread-3 End.
[monitor] [2/2] Active: 2, Completed: 4, Task: 6, isShutdown: true, isTerminated: false
[monitor] [2/2] Active: 2, Completed: 4, Task: 6, isShutdown: true, isTerminated: false
pool-1-thread-2 End.
pool-1-thread-1 End.
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
具体原因可以参考上面线程池处理流程举例说明。测试项目源代码下载链接
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:87315次
积分:2610
积分:2610
排名:第9837名
原创:169篇
转载:29篇
评论:19条
(2)(5)(1)(21)(17)(14)(13)(4)(4)(2)(2)(1)(4)(6)(6)(7)(3)(1)(2)(4)(12)(2)(2)(5)(4)(7)(10)(10)(22)(1)(1)(3)}

我要回帖

更多关于 java线程池策略 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信