「java线程池队列」java线程池队列持久化

博主:adminadmin 2023-03-19 18:09:11 407

今天给各位分享java线程池队列的知识,其中也会对java线程池队列持久化进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

超详细的线程池使用解析

Java 中线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。合理的使用线程池可以带来多个好处:

(1) 降低资源消耗 。通过重复利用已创建的线程降低线程在创建和销毁时造成的消耗。

(2) 提高响应速度 。当处理执行任务时,任务可以不需要等待线程的创建就能立刻执行。

(3) 提高线程的可管理性 。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程池的处理流程如上图所示

线程池中通过 ctl 字段来表示线程池中的当前状态,主池控制状态 ctl 是 AtomicInteger 类型,包装了两个概念字段:workerCount 和 runState,workerCount 表示有效线程数,runState 表示是否正在运行、正在关闭等状态。使用 ctl 字段表示两个概念,ctl 的前 3 位表示线程池状态,线程池中限制 workerCount 为(2^29 )-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个线程。workerCount 是允许启动和不允许停止的工作程序的数量。该值可能与实际的活动线程数暂时不同,例如,当 ThreadFactory 在被询问时未能创建线程时,以及退出线程在终止前仍在执行记时。用户可见的池大小报告为工作集的当前大小。 runState 提供主要的生命周期控制,取值如下表所示:

runState 随着时间的推移而改变,在 awaitTermination() 方法中等待的线程将在状态达到 TERMINATED 时返回。状态的转换为:

RUNNING - SHUTDOWN 在调用 shutdown() 时,可能隐含在 finalize() 中

(RUNNING 或 SHUTDOWN)- STOP 在调用 shutdownNow() 时

SHUTDOWN - TIDYING 当队列和线程池都为空时

STOP - TIDYING 当线程池为空时

TIDYING - TERMINATED 当 terminate() 方法完成时

开发人员如果需要在线程池变为 TIDYING 状态时进行相应的处理,可以通过重载 terminated() 函数来实现。

结合上图说明线程池 ThreadPoolExecutor 执行流程,使用 execute() 方法提交任务到线程池中执行时分为4种场景:

(1)线程池中运行的线程数量小于 corePoolSize,创建新线程来执行任务。

(2)线程池中运行线程数量不小于 corePoolSize,将任务加入到阻塞队列 BlockingQueue。

(3)如果无法将任务加入到阻塞队列(队列已满),创建新的线程来处理任务(这里需要获取全局锁)。

(4)当创建新的线程数量使线程池中当前运行线程数量超过 maximumPoolSize,线程池中拒绝任务,调用 RejectedExecutionHandler.rejectedExecution() 方法处理。

源码分析:

线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。

创建线程池之前,首先要知道创建线程池中的核心参数:

corePoolSize (核心线程数大小):当提交任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于核心线程数时就不再创建。

runnableTaskQueue (任务队列):用于保存等待执行任务的阻塞队列。一般选择以下几种:

ArrayBlockingQueue:基于数组的有界阻塞队列,按照 FIFO 原则对元素进行排序。

LinkedBlockingQueue:基于链表的阻塞队列,按照 FIFO 原则对元素进行排序。

SynchronousQueue:同步阻塞队列,也是不存储元素的阻塞队列。每一个插入操作必须要等到另一个 线程调用移除操作,否则插入操作一直处于阻塞状态。

PriorityBlockingQueue:优先阻塞队列,一个具有优先级的无限阻塞队列。

maximumPoolSize (最大线程数大小):线程池允许创建的最大线程数,当队列已满,并且线程池中的线程数小于最大线程数,则线程池会创建新的线程执行任务。当使用无界队列时,此参数无用。

RejectedExecutionHandler (拒绝策略):当任务队列和线程池都满了,说明线程池处于饱和状态,那么必须使用拒绝策略来处理新提交的任务。JDK 内置拒绝策略有以下 4 种:

AbortPolicy:直接抛出异常

CallerRunsPolicy:使用调用者所在的线程来执行任务

DiscardOldestPolicy:丢弃队列中最近的一个任务来执行当前任务

DiscardPolicy:直接丢弃不处理

可以根据应用场景来实现 RejectedExecutionHandler 接口自定义处理策略。

keepAliveTime (线程存活时间):线程池的工作线程空闲后,保持存活的时间。

TimeUnit (存活时间单位):可选单位DAYS(天)、HOURS(小时)、MINUTES(分钟)、MILLISECONDS(毫秒)、MICROSECONDS(微妙)、NANOSECONDS(纳秒)。

ThreadFactory (线程工厂):可以通过线程工厂给创建出来的线程设置有意义的名字。

创建线程池主要分为两大类,第一种是通过 Executors 工厂类创建线程池,第二种是自定义创建线程池。根据《阿里java开发手册》中的规范,线程池不允许使用 Executors 去创建,原因是规避资源耗尽的风险。

创建一个单线程化的线程池

创建固定线程数的线程池

以上两种创建线程池方式使用链表阻塞队列来存放任务,实际场景中可能会堆积大量请求导致 OOM

创建可缓存线程池

允许创建的线程数量最大为 Integer.MAX_VALUE,当创建大量线程时会导致 CPU 处于重负载状态和 OOM 的发生

向线程池提交任务可以使用两个方法,分别为 execute() 和 submit()。

execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute() 方法中传入的是 Runnable 类的实例。

submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get() 方法来获取返回值。get() 方法会阻塞当前线程直到任务完成,使用 get(long timeout, TimeUnit unit)方法会阻塞当前线程一段时间后立即返回,这时候可能任务没有执行完。

可以通过调用线程池的 shutdown() 或shutdownNow() 方法来关闭线程池。他们的原理是遍历线程池中的工作线程,然后逐个调用 interrupt() 方法来中断线程,所以无法响应中断任务可能永远无法终止。

shutdown() 和 shutdownNow() 方法的区别在于 shutdownNow 方法首先将线程池的状态设置为 STOP,然后尝试停止正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

线程池使用面临的核心的问题在于: 线程池的参数并不好配置 。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考。

(1)以任务型为参考的简单评估:

假设线程池大小的设置(N 为 CPU 的个数)

如果纯计算的任务,多线程并不能带来性能提升,因为 CPU 处理能力是稀缺的资源,相反导致较多的线程切换的花销,此时建议线程数为 CPU 数量或+1;----为什么+1?因为可以防止 N 个线程中有一个线程意外中断或者退出,CPU 不会空闲等待。

如果是 IO 密集型应用, 则线程池大小设置为 2N+1. 线程数 = CPU 核数 目标 CPU 利用率 (1 + 平均等待时间 / 平均工作时间)

(2)以任务数为参考的理想状态评估:

1)默认值

2)如何设置 * 需要根据相关值来决定 - tasks :每秒的任务数,假设为500~1000 - taskCost:每个任务花费时间,假设为0.1s - responsetime:系统允许容忍的最大响应时间,假设为1s

以上都为理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器 cpu load 已经满了,则需要通过升级硬件和优化代码,降低 taskCost 来处理。

(仅为简单的理想状态的评估,可作为线程池参数设置的一个参考)

与主业务无直接数据依赖的从业务可以使用异步线程池来处理,在项目初始化时创建线程池并交给将从业务中的任务提交给异步线程池执行能够缩短响应时间。

严禁在业务代码中起线程!!!

当任务需要按照指定顺序(FIFO, LIFO, 优先级)执行时,推荐创建使用单线程化的线程池。

本文章主要说明了线程池的执行原理和创建方式以及推荐线程池参数设置和一般使用场景。在开发中,开发人员需要根据业务来合理的创建和使用线程池达到降低资源消耗,提高响应速度的目的。

原文链接:

4种线程池和7种并发队列

Java并发包中的阻塞队列一共7个,当然他们都是线程安全的。 

ArrayBlockingQueue:一个由数组结构组成的 有界 阻塞队列。 

LinkedBlockingQueue:一个由链表结构组成的无界阻塞队列。 

PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。 

DealyQueue:一个使用优先级(启动时间)队列实现的无界阻塞队列。 

SynchronousQueue:一个不存储元素的阻塞队列。 

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。 

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

常用的只有三个,重点是前两个

LinkedBlockingQueue和ArrayBlockingQueue区别:

1、LinkedBlockingQueue内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。而ArrayBlockingQueue的只使用一个ReentrantLock管理进出队列。

而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

2、队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。

3、数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。

由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

SynchronousQueue

没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素

使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界,避免线程拒绝执行操作。

1、newfixed,线程默认大小确定、最大大小确定(实际没什么用),默认使用linkedblockqueue,无尽队列

危害在于这个等待队列,队列如果消费不及时不断膨胀可以使机器资源耗尽

ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。

2、cached,线程数不限大小

危害 本身就是没有限制,有多少请求创建多少线程,直到资源耗尽

CachedThreadPool使用没有容量的SynchronousQueue作为主线程池的工作队列,它是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作。这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU资源。

3、single

可顺序执行任务,同时只有一个线程处理(单线程)

执行过程如下:

1.如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务。

2.当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。

3.线程执行完1中的任务后会从队列中去任务。

注意:由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。

4、ScheduledThreadPool

public ScheduledThreadPoolExecutor(int corePoolSize) {

        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

              new DelayedWorkQueue());

    }

使用了延迟队列,无界,和cached类似

如果运行的线程达到了corePoolSize,就把任务添加到任务队列DelayedWorkQueue中;DelayedWorkQueue会将任务排序,先执行的任务放在队列的前面。

任务执行完后,ScheduledFutureTask中的变量time改为下次要执行的时间,并放回到DelayedWorkQueue中

DelayQueue是一个没有边界BlockingQueue实现,加入其中的元素必需实现Delayed接口。 当生产者线程调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。

消费者线程查看队列头部的元素,注意是查看不是取出。然后调用元素的getDelay方法,如果此方法返回的值小0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果getDelay方法返回的值大于0,则消费者线程wait返回的时间值后,再从队列头部取出元素,此时元素应该已经到期。

DelayQueue是Leader-Followr模式的变种,消费者线程处于等待状态时,总是等待最先到期的元素,而不是长时间的等待。消费者线程尽量把时间花在处理任务上,最小化空等的时间,以提高线程的利用效率。

无论创建那种线程池 必须要调用ThreadPoolExecutor,以上四种都是调用这个实现的

线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue,

RejectedExecutionHandler handler)

corePoolSize: 线程池维护线程的最少数量 

maximumPoolSize:线程池维护线程的最大数量 

keepAliveTime: 线程池维护线程所允许的空闲时间 , unit: 线程池维护线程所允许的空闲时间的单位 

workQueue: 线程池所使用的缓冲队列 

handler: 线程池对拒绝任务的处理策略 --饱和策略

-------------------------------------------------------------------

通用流程:

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。

当一个任务通过execute(Runnable)方法欲添加到线程池时:

如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。

也就是:处理任务的优先级为:

核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 

当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

---------------------------------------------------------------------------------------------

unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:

NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

workQueue我常用的是:java.util.concurrent. ArrayBlockingQueue

handler有四个选择:

1、【抛异常】ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常

2、【重试】ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会自动重复调用execute()方法

ThreadPoolExecutor.DiscardOldestPolicy()

3、【找个旧的停了】抛弃旧的任务 ThreadPoolExecutor.DiscardPolicy()

4、【不抛异常】抛弃当前的任务

5、当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义 饱和策略 ,如记录日志或持久化存储不能处理的任务。

java线程池(一) 简述线程池的几种使用方式

首先说明下java线程是如何实现线程重用的

1. 线程执行完一个Runnable的run()方法后,不会被杀死

2. 当线程被重用时,这个线程会进入新Runnable对象的run()方法12

java线程池由Executors提供的几种静态方法创建线程池。下面通过代码片段简单介绍下线程池的几种实现方式。后续会针对每个实现方式做详细的说明

newFixedThreadPool

创建一个固定大小的线程池

添加的任务达到线程池的容量之后开始加入任务队列开始线程重用总共开启线程个数跟指定容量相同。

@Test

public void newFixedThreadPool() throws Exception {

ExecutorService executorService = Executors.newFixedThreadPool(1);

executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().build());

RunThread run1 = new RunThread("run 1");

executorService.execute(run1);

executorService.shutdown();

}12345678

newSingleThreadExecutor

仅支持单线程顺序处理任务

@Test

public void newSingleThreadExecutor() throws Exception {

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build());

executorService.execute(new RunThread("run 1"));

executorService.execute(new RunThread("run 2"));

executorService.shutdown();

}123456789

newCachedThreadPool

这种情况跟第一种的方式类似,不同的是这种情况线程池容量上线是Integer.MAX_VALUE 并且线程池开启缓存60s

@Test

public void newCachedThreadPool() throws Exception {

ExecutorService executorService = Executors.newCachedThreadPool();

executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().build());

executorService.execute(new RunThread("run 1"));

executorService.execute(new RunThread("run 2"));

executorService.shutdown();

}123456789

newWorkStealingPool

支持给定的并行级别,并且可以使用多个队列来减少争用。

@Test

public void newWorkStealingPool() throws Exception {

ExecutorService executorService = Executors.newWorkStealingPool();

executorService = Executors.newWorkStealingPool(1);

RunThread run1 = new RunThread("run 1");

executorService.execute(run1);

executorService.shutdown();

}123456789

newScheduledThreadPool

看到的现象和第一种相同,也是在线程池满之前是新建线程,然后开始进入任务队列,进行线程重用

支持定时周期执行任务(还没有看完)

@Test

public void newScheduledThreadPool() throws Exception {

ExecutorService executorService = Executors.newScheduledThreadPool(1);

executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());

executorService.execute(new RunThread("run 1"));

executorService.execute(new RunThread("run 2"));

executorService.shutdown();

}

java线程组,线程池,线程队列分别是什么?有什么区别?

你好,我可以给你详细解释一下:

线程组表示一个线程的集合。此外,线程组也可以包含其他线程组。线程组构成一棵树,在树中,除了初始线程组外,每个线程组都有一个父线程组。

允许线程访问有关自己的线程组的信息,但是不允许它访问有关其线程组的父线程组或其他任何线程组的信息。

线程池:我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(Blocking Queue ),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。

线程池经常应用在多线程服务器上。每个通过网络到达服务器的连接都被包装成一个任务并且传递给线程池。线程池的线程会并发的处理连接上的请求。以后会再深入有关 Java 实现多线程服务器的细节。

线程队列:是指线程处于拥塞的时候形成的调度队列

排队有三种通用策略:

直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

合理使用线程池以及线程变量

背景

随着计算技术的不断发展,3纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐渐面临巨大的物理瓶颈,通过多核处理器技术来提升服务器的性能成为提升算力的主要方向。

在服务器领域,基于java构建的后端服务器占据着领先地位,因此,掌握java并发编程技术,充分利用CPU的并发处理能力是一个开发人员必修的基本功,本文结合线程池源码和实践,简要介绍了线程池和线程变量的使用。

线程池概述

线程池是一种“池化”的线程使用模式,通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。

总体来说,线程池有如下的优势:

线程池的使用

在java中,线程池的实现类是ThreadPoolExecutor,构造函数如下:

可以通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)来创建一个线程池。

在构造函数中,corePoolSize为线程池核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程超时也会回收。

在构造函数中,maximumPoolSize为线程池所能容纳的最大线程数。

在构造函数中,keepAliveTime表示线程闲置超时时长。如果线程闲置时间超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。

在构造函数中,timeUnit表示线程闲置超时时长的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。

在构造函数中,blockingQueue表示任务队列,线程池任务队列的常用实现类有:

在构造函数中,threadFactory表示线程工厂。用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。如通过Google工具包可以设置线程池里的线程名:

在构造函数中,rejectedExecutionHandler表示拒绝策略。当达到最大线程数且队列任务已满时需要执行的拒绝策略,常见的拒绝策略如下:

ThreadPoolExecutor线程池有如下几种状态:

线程池提交一个任务时任务调度的主要步骤如下:

核心代码如下:

Tomcat 的整体架构包含连接器和容器两大部分,其中连接器负责与外部通信,容器负责内部逻辑处理。在连接器中:

Tomcat为了实现请求的快速响应,使用线程池来提高请求的处理能力。下面我们以HTTP非阻塞I/O为例对Tomcat线程池进行简要的分析。

在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的处理,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor方法来创建Tomcat默认线程池。

核心部分代码如下:

其中,TaskQueue、ThreadPoolExecutor分别为Tomcat自定义任务队列、线程池实现。

Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。

Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。

在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200, 为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。

具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:

Tomcat中通过自定义任务线程TaskThread实现对每个线程创建时间的记录;使用静态内部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异常类型的处理。

Executors常用方法有以下几个:

Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端 :

使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。

使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。

在工程实践中,通常使用下述公式来计算核心线程数:

nThreads=(w+c)/c*n*u=(w/c+1)*n*u

其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0, 1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。

上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。

为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:

为了实现优雅停机的目标,我们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后我们还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常我们需要根据业务场景预估一个合理的超时时间,然后调用该方法。

如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。

ThreadLocal线程变量概述

ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于普通的变量,访问线程本地变量的每个线程(通过其get或set方法)都有其自己的独立初始化的变量副本,因此ThreadLocal没有多线程竞争的问题,不需要单独进行加锁。

ThreadLocal的原理与实践

对于ThreadLocal而言,常用的方法有get/set/initialValue 3个方法。

众所周知,在java中SimpleDateFormat有线程安全问题,为了安全地使用SimpleDateFormat,除了1)创建SimpleDateFormat局部变量;和2)加同步锁 两种方案外,我们还可以使用3)ThreadLocal的方案:

Thread 内部维护了一个 ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。

从JDK源码可见,ThreadLocalMap中的Entry是弱引用类型的,这就意味着如果这个ThreadLocal只被这个Entry引用,而没有被其他对象强引用时,就会在下一次GC的时候回收掉。

EagleEye(鹰眼)作为全链路监控系统在集团内部被广泛使用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,部分相关代码如下:

在EagleEyeFilter中,通过EagleEyeRequestTracer.startTrace方法进行初始化,在前置入参转换后,通过startTrace重载方法将鹰眼上下文参数存入ThreadLocal中,相关代码如下:

EagleEyeFilter在finally代码块中,通过EagleEyeRequestTracer.endTrace方法结束调用链,通过clear方法将ThreadLocal中的数据进行清理,相关代码实现如下:

在某权益领取原有链路中,通过app打开一级页面后才能发起权益领取请求,请求经过淘系无线网关(Mtop)后到达服务端,服务端通过mtop sdk获取当前会话信息。

在XX项目中,对权益领取链路进行了升级改造,在一级页面请求时,通过服务端同时发起权益领取请求。具体地,服务端在处理一级页面请求时,同时通过调用hsf/dubbo接口来进行权益领取,因此在发起rpc调用时需要携带用户当前会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而才能通过mtop sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:

【问题1:权益领取失败分析】

在权益领取服务中,该应用构建了一套高效和线程安全的依赖注入框架,基于该框架的业务逻辑模块通常抽象为xxxModule形式,Module间为网状依赖关系,框架会按依赖关系自动调用init方法(其中,被依赖的module 的init方法先执行)。

在应用中,权益领取接口的主入口为CommonXXApplyModule类,CommonXXApplyModule依赖XXSessionModule。当请求来临时,会按依赖关系依次调用init方法,因此XXSessionModule的init方法会优先执行;而开发同学在CommonXXApplyModule类中的init方法中通过调用recoverMtopContext()方法来期望恢复mtop上下文,因recoverMtopContext()方法的调用时机过晚,从而导致XXSessionModule模块获取不到正确的会话id等信息而导致权益领取失败。

【问题2:脏数据分析】

权益领取服务在处理请求时,若当前线程曾经处理过权益领取请求,因ThreadLocal变量值未被清理,此时XXSessionModule通过mtop SDK获取会话信息时得到的是前一次请求的会话信息,从而造成脏数据。

【解决方案】

在依赖注入框架入口处AbstractGate#visit(或在XXSessionModule中)通过recoverMtopContext方法注入mtop上下文信息,并在入口方法的finally代码块清理当前请求的threadlocal变量值。

若使用强引用类型,则threadlocal的引用链为:Thread - ThreadLocal.ThreadLocalMap - Entry[] - Entry - key(threadLocal对象)和value;在这种场景下,只要这个线程还在运行(如线程池场景),若不调用remove方法,则该对象及关联的所有强引用对象都不会被垃圾回收器回收。

若使用static关键字进行修饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存泄漏,如下述示例:

在上述main方法第22行debug,可见线程的threadLocals变量中有3个threadlocal实例。在工程实践中,使用threadlocal时通常期望一个线程只有一个threadlocal实例,因此,若不使用static修饰,期望的语义发生了变化,同时易引起内存泄漏。

如果不执行清理操作,则可能会出现:

建议使用try...finally 进行清理。

我们在使用ThreadLocal时,通常期望的语义是perThread,若不使用static进行修饰,则语义变为perThread-perInstance;在线程池场景下,若不用static进行修饰,创建的线程相关实例可能会达到 M * N个(其中M为线程数,N为对应类的实例数),易造成内存泄漏()。

在应用中,谨慎使用ThreadLocal.withInitial(Supplier? extends S supplier)这个工厂方法创建ThreadLocal对象,一旦不同线程的ThreadLocal使用了同一个Supplier对象,那么隔离也就无从谈起了,如:

总结

在java工程实践中,线程池和线程变量被广泛使用,因线程池和线程变量的不当使用经常造成安全生产事故,因此,正确使用线程池和线程变量是每一位开发人员必须修炼的基本功。本文从线程池和线程变量的使用出发,简要介绍了线程池和线程变量的原理和使用实践,各开发人员可结合最佳实践和实际应用场景,正确地使用线程和线程变量,构建出稳定、高效的java应用服务。

线程池-参数篇:2.队列

多线程环境中,通过队列可以很容易实现线程间数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享;同时作为BlockingQueue的使用者,我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue的实现者都给一手包办了。

基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,另外还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。

基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。如果没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

DelayQueue用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

Delayed 是一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。

考虑以下场景:

一种笨笨的办法就是,使用一个后台线程,遍历所有对象,挨个检查。这种笨笨的办法简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,多小则存在效率问题。而且做不到按超时的时间顺序处理。

这场景,使用DelayQueue最适合了,详情查看 DelayedQueue学习笔记 ; 精巧好用的DelayQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),需要注意PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。

使用时,若生产者生产数据的速度快于消费者消费数据的速度,随着长时间的运行,可能会耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

SynchronousQueue是一个内部只能包含零个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取元素。同样,如果线程尝试获取元素并且当前没有线程在插入元素,则该线程将被阻塞,直到有线程将元素插入队列

声明一个SynchronousQueue有公平模式和非公平模式,区别如下:

参考: Java多线程-工具篇-BlockingQueue

12. SynchronousQueue

java线程池队列的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于java线程池队列持久化、java线程池队列的信息别忘了在本站进行查找喔。