「java线程池的拒绝实现」java线程池的拒绝策略

博主:adminadmin 2023-01-05 04:00:06 874

今天给各位分享java线程池的拒绝实现的知识,其中也会对java线程池的拒绝策略进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

面试题:线程池有几种实现方式,线程池的七大参数有哪些?

在Java编码的过程中,我们经常会创建一个线程来提高程序的执行效率,虽然这样实现起来很方便,但是会有一个问题:如果并发的线程数多,并且每个线程都是执行一个时间很短的任务就结束了,这样会造成频繁的创建和销毁线程从而导致降低系统的效率。

那么问题来了,有没有办法可用复用创建好的线程呢,也就是线程执行完一个任务后,不被销毁,继续执行其他的任务?

在Java可以通过线程池来实现这样的效果。

下面从三个方面和大家一起来探讨一下Java线程池相关的内容。

1.Java中的ThreadPoolExecutor类。

2.Java中4种线程池的使用。

3.Java线程池常用参数如何设置。

一、Java中的ThreadPoolExecutor类

A.ThreadPoolExecutor的重要参数

1.corePoolSize:核心线程数

核心线程会一直存活,及时没有任务需要执行。

当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理。

设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭。

2.queueCapacity:任务队列容量(阻塞队列)

当核心线程数达到最大时,新任务会放在队列中排队等待执行。

3.maxPoolSize:最大线程数

当线程数=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务。

当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常。

4.keepAliveTime:线程空闲时间

当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize。

如果allowCoreThreadTimeout=true,则会直到线程数量=0。

5.allowCoreThreadTimeout:允许核心线程超时

6.rejectedExecutionHandler:任务拒绝处理器。

B.ThreadPoolExecutor执行过程

1.当线程数小于核心线程数时,创建线程。

2.当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。

3.当线程数大于等于核心线程数,且任务队列已满。(1)若线程数小于最大线程数,创建线程。(2)若线程数等于最大线程数,抛出异常,拒绝任务。

二、Java中4种线程池

Java通过Executors提供四种线程池,分别为:

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

线程池的拒绝策略示例

Java的线程池中,如果不断往线程池提交任务,最终会发生什么?

如果work queue是一个有界队列,队列放满,线程数量达到maxsize,且没有空闲线程时,再往线程池提交任务会触发线程池的拒绝策略。

线程池有哪些拒绝策略呢?

一共提交8个任务,其中有一个默默被丢弃。

线程池1个核心线程,max线程数为2,work queue大小为5.

可以看到,提交8个任务后,第2个任务被丢弃了。因为第2个任务是oldest,第一个被放进queue的任务。

用这种拒绝策略时要注意,主线程既需要负责创建线程,又需要执行任务,会造成性能问题。

在输出中,能看出,主线程号为1,而提交的任务中,其中一个任务(最后一个被提交的任务)就是由主线程来执行的。

了解了前四种拒绝策略,发现:

abort,discard,discardOldest都会丢弃任务;

callerRun虽然执行了任务,但是会影响主线程性能。

若将work queue设置为无界队列,或者将maxsize设置为最大整数,都有可能造成out of memory。

那么可以通过自定义拒绝策略,让后进来的task阻塞住,有资源了再处理。这样可以让每一个任务都得到执行。

线程池 java中怎么设置拒绝策略

应该是没有这个功能的,因为线程池里面的线程实际上是复用的,即执行完一个Job以后会从Quenue(任务队列)里面取新的JOB。 

如果有这样的需求可以: 

1)控制JOB的执行时间不能太长,否则可能会造成阻塞; 

2)在JOB的实现(run方法)里面做相应的控制; 

3)如果JOB有长时间和短时间两种模式,可以考虑放在两个线程池中,避免长时间的任务阻塞短时间的任务; 

4)也可以控制等待队列的任务个数,但是Executors默认的Factory方法是没有这个参数的,需要直接new ThreadPoolExecutor

java线程池怎么实现

要想理解清楚java线程池实现原理,明白下面几个问题就可以了:

(1):线程池存在哪些状态,这些状态之间是如何进行切换的呢?

(2):线程池的种类有哪些?

(3):创建线程池需要哪些参数,这些参数的具体含义是什么?

(4):将任务添加到线程池之后运行流程?

(5):线程池是怎么做到重用线程的呢?

(6):线程池的关闭

首先回答第一个问题:线程池存在哪些状态;

查看ThreadPoolExecutor源码便知晓:

[java] view plain copy

// runState is stored in the high-order bits

private static final int RUNNING    = -1  COUNT_BITS;

private static final int SHUTDOWN   =  0  COUNT_BITS;

private static final int STOP       =  1  COUNT_BITS;

private static final int TIDYING    =  2  COUNT_BITS;

private static final int TERMINATED =  3  COUNT_BITS;

存在5种状态:

1Running:可以接受新任务,同时也可以处理阻塞队列里面的任务;

2Shutdown:不可以接受新任务,但是可以处理阻塞队列里面的任务;

3Stop:不可以接受新任务,也不处理阻塞队列里面的任务,同时还中断正在处理的任务;

4Tidying:属于过渡阶段,在这个阶段表示所有的任务已经执行结束了,当前线程池中是不存在有效的线程的,并且将要调用terminated方法;

5Terminated:终止状态,这个状态是在调用完terminated方法之后所处的状态;

那么这5种状态之间是如何进行转换的呢?查看ThreadPoolExecutor源码里面的注释便可以知道啦:

[java] view plain copy

* RUNNING - SHUTDOWN

*    On invocation of shutdown(), perhaps implicitly in finalize()

* (RUNNING or SHUTDOWN) - STOP

*    On invocation of shutdownNow()

* SHUTDOWN - TIDYING

*    When both queue and pool are empty

* STOP - TIDYING

*    When pool is empty

* TIDYING - TERMINATED

*    When the terminated() hook method has completed

从上面可以看到,在调用shutdown方法的时候,线程池状态会从Running转换成Shutdown;在调用shutdownNow方法的时候,线程池状态会从Running/Shutdown转换成Stop;在阻塞队列为空同时线程池为空的情况下,线程池状态会从Shutdown转换成Tidying;在线程池为空的情况下,线程池状态会从Stop转换成Tidying;当调用terminated方法之后,线程池状态会从Tidying转换成Terminate;

在明白了线程池的各个状态以及状态之间是怎么进行切换之后,我们来看看第二个问题,线程池的种类:

(1):CachedThreadPool:缓存线程池,该类线程池中线程的数量是不确定的,理论上可以达到Integer.MAX_VALUE个,这种线程池中的线程都是非核心线程,既然是非核心线程,那么就存在超时淘汰机制了,当里面的某个线程空闲时间超过了设定的超时时间的话,就会回收掉该线程;

(2):FixedThreadPool:固定线程池,这类线程池中是只存在核心线程的,对于核心线程来说,如果我们不设置allowCoreThreadTimeOut属性的话是不存在超时淘汰机制的,这类线程池中的corePoolSize的大小是等于maximumPoolSize大小的,也就是说,如果线程池中的线程都处于活动状态的话,如果有新任务到来,他是不会开辟新的工作线程来处理这些任务的,只能将这些任务放到阻塞队列里面进行等到,直到有核心线程空闲为止;

(3):ScheduledThreadPool:任务线程池,这种线程池中核心线程的数量是固定的,而对于非核心线程的数量是不限制的,同时对于非核心线程是存在超时淘汰机制的,主要适用于执行定时任务或者周期性任务的场景;

(4):SingleThreadPool:单一线程池,线程池里面只有一个线程,同时也不存在非核心线程,感觉像是FixedThreadPool的特殊版本,他主要用于确保任务在同一线程中的顺序执行,有点类似于进行同步吧;

接下来我们来看第三个问题,创建线程池需要哪些参数:

同样查看ThreadPoolExecutor源码,查看创建线程池的构造函数:

[java] view plain copy

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueueRunnable workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler)

不管你调用的是ThreadPoolExecutor的哪个构造函数,最终都会执行到这个构造函数的,这个构造函数有7个参数,正是由于对这7个参数值的赋值不同,造成生成不同类型的线程池,比如我们常见的CachedThreadPoolExecutor、FixedThreadPoolExecutor

SingleThreadPoolExecutor、ScheduledThreadPoolExecutor,我们老看看这几个参数的具体含义:

1corePoolSize:线程池中核心线程的数量;当提交一个任务到线程池的时候,线程池会创建一个线程来执行执行任务,即使有其他空闲的线程存在,直到线程数达到corePoolSize时不再创建,这时候会把提交的新任务放入到阻塞队列中,如果调用了线程池的preStartAllCoreThreads方法,则会在创建线程池的时候初始化出来核心线程;

2maximumPoolSize:线程池允许创建的最大线程数;如果阻塞队列已经满了,同时已经创建的线程数小于最大线程数的话,那么会创建新的线程来处理阻塞队列中的任务;

3keepAliveTime:线程活动保持时间,指的是工作线程空闲之后继续存活的时间,默认情况下,这个参数只有线程数大于corePoolSize的时候才会起作用,即当线程池中的线程数目大于corePoolSize的时候,如果某一个线程的空闲时间达到keepAliveTime,那么这个线程是会被终止的,直到线程池中的线程数目不大于corePoolSize;如果调用allowCoreThreadTimeOut的话,在线程池中线程数量不大于corePoolSize的时候,keepAliveTime参数也可以起作用的,知道线程数目为0为止;

4unit:参数keepAliveTime的时间单位;

5workQueue:阻塞队列;用于存储等待执行的任务,有四种阻塞队列类型,ArrayBlockingQueue(基于数组的有界阻塞队列)、LinkedBlockingQueue(基于链表结构的阻塞队列)、SynchronousQueue(不存储元素的阻塞队列)、PriorityBlockingQueue(具有优先级的阻塞队列);

6threadFactory:用于创建线程的线程工厂;

7handler:当阻塞队列满了,且没有空闲线程的情况下,也就是说这个时候,线程池中的线程数目已经达到了最大线程数量,处于饱和状态,那么必须采取一种策略来处理新提交的任务,我们可以自己定义处理策略,也可以使用系统已经提供给我们的策略,先来看看系统为我们提供的4种策略,AbortPolicy(直接抛出异常)、CallerRunsPolicy(只有调用者所在的线程来运行任务)、DiscardOldestPolicy(丢弃阻塞队列中最近的一个任务,并执行当前任务)、Discard(直接丢弃);

接下来就是将任务添加到线程池之后的运行流程了;

我们可以调用submit或者execute方法,两者最大的区别在于,调用submit方法的话,我们可以传入一个实现Callable接口的对象,进而能在当前任务执行结束之后通过Future对象获得任务的返回值,submit内部实际上还是执行的execute方法;而调用execute方法的话,是不能获得任务执行结束之后的返回值的;此外,调用submit方法的话是可以抛出异常的,但是调用execute方法的话,异常在其内部得到了消化,也就是说异常在其内部得到了处理,不会向外传递的;

因为submit方法最终也是会执行execute方法的,因此我们只需要了解execute方法就可以了:

在execute方法内部会分三种情况来进行处理:

1:首先判断当前线程池中的线程数量是否小于corePoolSize,如果小于的话,则直接通过addWorker方法创建一个新的Worker对象来执行我们当前的任务;

2:如果说当前线程池中的线程数量大于corePoolSize的话,那么会尝试将当前任务添加到阻塞队列中,然后第二次检查线程池的状态,如果线程池不在Running状态的话,会将刚刚添加到阻塞队列中的任务移出,同时拒绝当前任务请求;如果第二次检查发现当前线程池处于Running状态的话,那么会查看当前线程池中的工作线程数量是否为0,如果为0的话,就会通过addWorker方法创建一个Worker对象出来处理阻塞队列中的任务;

3:如果原先线程池就不处于Running状态或者我们刚刚将当前任务添加到阻塞队列的时候出现错误的话,那么会去尝试通过addWorker创建新的Worker来处理当前任务,如果添加失败的话,则拒绝当前任务请求;

可以看到在上面的execute方法中,我们仅仅只是检查了当前线程池中的线程数量有没有超过corePoolSize的情况,那么当前线程池中的线程数量有没有超过maximumPoolSize是在哪里检测的呢?实际上是在addWorker方法里面了,我们可以看下addWorker里面的一段代码:

[java] view plain copy

if (wc = CAPACITY ||

wc = (core ? corePoolSize : maximumPoolSize))

return false;

如果当前线程数量超过maximumPoolSize的话,直接就会调用return方法,返回false;

其实到这里我们很明显可以知道,一个线程池中线程的数量实际上就是这个线程池中Worker的数量,如果Worker的大小超过了corePoolSize,那么任务都在阻塞队列里面了,Worker是Java对我们任务的一个封装类,他的声明是酱紫的:

[java] view plain copy

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

可以看到他实现了Runnable接口,他是在addWorker方法里面通过new Worker(firstTask)创建的,我们来看看他的构造函数就知道了:

[java] view plain copy

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

而这里的firstTask其实就是我们调用execute或者submit的时候传入的那个参数罢了,一般来说这些参数是实现Callable或者Runnable接口的;

在通过addWorker方法创建出来Worker对象之后,这个方法的最后会执行Worker内部thread属性的start方法,而这个thread属性实际上就是封装了Worker的Thread,执行他的start方法实际上执行的是Worker的run方法,因为Worker是实现了Runnable接口的,在run方法里面就会执行runWorker方法,而runWorker方法里面首先会判断当前我们传入的任务是否为空,不为空的话直接就会执行我们通过execute或者submit方法提交的任务啦,注意一点就是我们虽然会通过submit方法提交实现了Callable接口的对象,但是在调用submit方法的时候,其实是会将Callable对象封装成实现了Runnable接口对象的,不信我们看看submit方法源码是怎么实现的:

[java] view plain copy

public T FutureT submit(CallableT task) {

if (task == null) throw new NullPointerException();

RunnableFutureT ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

看到没有呢,实际上在你传入实现了Callable接口对象的时候,在submit方法里面是会将其封装成RunnableFuture对象的,而RunnableFuture接口是继承了Runnable接口的;那么说白了其实就是直接执行我们提交任务的run方法了;如果为空的话,则会通过getTask方法从阻塞队列里面拿出一个任务去执行;在任务执行结束之后继续从阻塞队列里面拿任务,直到getTask的返回值为空则退出runWorker内部循环,那么什么情况下getTask返回为空呢?查看getTask方法的源码注释可以知道:在Worker必须需要退出的情况下getTask会返回空,具体什么情况下Worker会退出呢?(1):当Worker的数量超过maximumPoolSize的时候;(2):当线程池状态为Stop的时候;(3):当线程池状态为Shutdown并且阻塞队列为空的时候;(4):使用等待超时时间从阻塞队列中拿数据,但是超时之后仍然没有拿到数据;

如果runWorker方法退出了它里面的循环,那么就说明当前阻塞队列里面是没有任务可以执行的了,你可以看到在runWorker方法内部的finally语句块中执行了processWorkerExit方法,用来对Worker对象进行回收操作,这个方法会传入一个参数表示需要删除的Worker对象;在进行Worker回收的时候会调用tryTerminate方法来尝试关闭线程池,在tryTerminate方法里面会检查是否有Worker在工作,检查线程池的状态,没问题的话就会将当前线程池的状态过渡到Tidying,之后调用terminated方法,将线程池状态更新到Terminated;

从上面的分析中,我们可以看出线程池运行的4个阶段:

(1):poolSize corePoolSize,则直接创建新的线程(核心线程)来执行当前提交的任务;

(2):poolSize = corePoolSize,并且此时阻塞队列没有满,那么会将当前任务添加到阻塞队列中,如果此时存在工作线程(非核心线程)的话,那么会由工作线程来处理该阻塞队列中的任务,如果此时工作线程数量为0的话,那么会创建一个工作线程(非核心线程)出来;

(3):poolSize = corePoolSize,并且此时阻塞队列已经满了,那么会直接创建新的工作线程(非核心线程)来处理阻塞队列中的任务;

(4):poolSize = maximumPoolSize,并且此时阻塞队列也满了的话,那么会触发拒绝机制,具体决绝策略采用的是什么就要看我们创建ThreadPoolExecutor的时候传入的RejectExecutionHandler参数了;

接下来就是线程池是怎么做到重用线程的呢?

个人认为线程池里面重用线程的工作是在getTask里面实现的,在getTask里面是存在两个for死循环嵌套的,他会不断的从阻塞对列里面取出需要执行的任务,返回给我们的runWorker方法里面,而在runWorker方法里面只要getTask返回的任务不是空就会执行该任务的run方法来处理它,这样一直执行下去,直到getTask返回空为止,此时的情况就是阻塞队列里面没有任务了,这样一个线程处理完一个任务之后接着再处理阻塞队列中的另一个任务,当然在线程池中的不同线程是可以并发处理阻塞队列中的任务的,最后在阻塞队列内部不存在任务的时候会去判断是否需要回收Worker对象,其实Worker对象的个数就是线程池中线程的个数,至于什么情况才需要回收,上面已经说了,就是四种情况了;

最后就是线程池是怎样被关闭的呢?

涉及到线程池的关闭,需要用到两个方法,shutdown和shutdownNow,他们都是位于ThreadPoolExecutor里面的,对于shutdown的话,他会将线程池状态切换成Shutdown,此时是不会影响对阻塞队列中任务执行的,但是会拒绝执行新加进来的任务,同时会回收闲置的Worker;而shutdownNow方法会将线程池状态切换成Stop,此时既不会再去处理阻塞队列里面的任务,也不会去处理新加进来的任务,同时会回收所有Worker;

关于java线程池的拒绝实现和java线程池的拒绝策略的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。