「java线程驻留」java 常驻线程

博主:adminadmin 2022-12-11 16:36:08 76

今天给各位分享java线程驻留的知识,其中也会对java 常驻线程进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

Java线程池中的核心线程是如何被重复利用的

Java线程池中的核心线程是如何被重复利用的?

引言

在Java开发中,经常需要创建线程去执行一些任务,实现起来也非常方便,但如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。此时,我们很自然会想到使用线程池来解决这个问题。

使用线程池的好处:

降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想一下如果我们有n多个子任务需要执行,如果我们为每个子任务都创建一个执行线程,而创建线程的过程是需要一定的系统消耗的,最后肯定会拖慢整个系统的处理速度。而通过线程池我们可以做到复用线程,任务有多个,但执行任务的线程可以通过线程池来复用,这样减少了创建线程的开销,系统资源利用率得到了提升。

降低管理线程的难度。多线程环境下对线程的管理是最容易出现问题的,而线程池通过框架为我们降低了管理线程的难度。我们不用再去担心何时该销毁线程,如何最大限度的避免多线程的资源竞争。这些事情线程池都帮我们代劳了。

提升任务处理速度。线程池中长期驻留了一定数量的活线程,当任务需要执行时,我们不必先去创建线程,线程池会自己选择利用现有的活线程来处理任务。

很显然,线程池一个很显著的特征就是“长期驻留了一定数量的活线程”,避免了频繁创建线程和销毁线程的开销,那么它是如何做到的呢?我们知道一个线程只要执行完了run()方法内的代码,这个线程的使命就完成了,等待它的就是销毁。既然这是个“活线程”,自然是不能很快就销毁的。为了搞清楚这个“活线程”是如何工作的,下面通过追踪源码来看看能不能解开这个疑问。

分析方法

在分析源码之前先来思考一下要怎么去分析,源码往往是比较复杂的,如果知识储备不够丰厚,很有可能会读不下去,或者读岔了。一般来讲要时刻紧跟着自己的目标来看代码,跟目标关系不大的代码可以不理会它,一些异常的处理也可以暂不理会,先看正常的流程。就我们现在要分析的源码而言,目标就是看看线程是如何被复用的。那么对于线程池的状态的管理以及非正常状态下的处理代码就可以不理会,具体来讲,在ThreadPollExcutor类中,有一个字段 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 是对线程池的运行状态和线程池中有效线程的数量进行控制的, 它包含两部分信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),还有几个对ctl进行计算的方法:

// 获取运行状态

private static int runStateOf(int c)     { return c ~CAPACITY; }

// 获取活动线程数

private static int workerCountOf(int c)  { return c CAPACITY; }123456

以上两个方法在源码中经常用到,结合我们的目标,对运行状态的一些判断及处理可以不用去管,而对当前活动线程数要加以关注等等。

下面将遵循这些原则来分析源码。

解惑

当我们要向线程池添加一个任务时是调用ThreadPollExcutor对象的execute(Runnable command)方法来完成的,所以先来看看ThreadPollExcutor类中的execute(Runnable command)方法的源码:

public void execute(Runnable command) {

   if (command == null)

       throw new NullPointerException();

   int c = ctl.get();

   if (workerCountOf(c) corePoolSize) {

       if (addWorker(command, true))

           return;

       c = ctl.get();

   }

   if (isRunning(c) workQueue.offer(command)) {

       int recheck = ctl.get();

       if (! isRunning(recheck) remove(command))

           reject(command);

       else if (workerCountOf(recheck) == 0)

           addWorker(null, false);

   }

   else if (!addWorker(command, false))

       reject(command);

}123456789101112131415161718192021

按照我们在分析方法中提到的一些原则,去掉一些相关性不强的代码,看看核心代码是怎样的。

// 为分析而简化后的代码

public void execute(Runnable command) {

   int c = ctl.get();

   if (workerCountOf(c) corePoolSize) {

       // 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中,并把任务添加到该线程中

       if (addWorker(command, true))

           return;

       c = ctl.get();

   }

   // 如果当前活动线程数大于等于corePoolSize,则尝试将任务放入缓存队列

   if (workQueue.offer(command)) {

       int recheck = ctl.get();

       if (workerCountOf(recheck) == 0)

           addWorker(null, false);

   }else {

       // 缓存已满,新建一个线程放入线程池,并把任务添加到该线程中(此时新建的线程相当于非核心线程)

       addWorker(command, false)

   }

}12345678910111213141516171819202122

这样一看,逻辑应该清晰很多了。

如果 当前活动线程数 指定的核心线程数,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);

如果 当前活动线程数 = 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;

如果 当前活动线程数 = 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);

接下来看 addWorker(Runnable firstTask, boolean core)方法

private boolean addWorker(Runnable firstTask, boolean core) {

   retry:

   for (;;) {

       int c = ctl.get();

       int rs = runStateOf(c);

       // Check if queue empty only if necessary.

       if (rs = SHUTDOWN

           ! (rs == SHUTDOWN

              firstTask == null

              ! workQueue.isEmpty()))

           return false;

       for (;;) {

           int wc = workerCountOf(c);

           if (wc = CAPACITY ||

               wc = (core ? corePoolSize : maximumPoolSize))

               return false;

           if (compareAndIncrementWorkerCount(c))

               break retry;

           c = ctl.get();  // Re-read ctl

           if (runStateOf(c) != rs)

               continue retry;

           // else CAS failed due to workerCount change; retry inner loop

       }

   }

   boolean workerStarted = false;

   boolean workerAdded = false;

   Worker w = null;

   try {

       w = new Worker(firstTask);

       final Thread t = w.thread;

       if (t != null) {

           final ReentrantLock mainLock = this.mainLock;

           mainLock.lock();

           try {

               // Recheck while holding lock.

               // Back out on ThreadFactory failure or if

               // shut down before lock acquired.

               int rs = runStateOf(ctl.get());

               if (rs SHUTDOWN ||

                   (rs == SHUTDOWN firstTask == null)) {

                   if (t.isAlive()) // precheck that t is startable

                       throw new IllegalThreadStateException();

                   workers.add(w);

                   int s = workers.size();

                   if (s largestPoolSize)

                       largestPoolSize = s;

                   workerAdded = true;

               }

           } finally {

               mainLock.unlock();

           }

           if (workerAdded) {

               t.start();

               workerStarted = true;

           }

       }

   } finally {

       if (! workerStarted)

           addWorkerFailed(w);

   }

   return workerStarted;

}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667

同样,我们也来简化一下:

// 为分析而简化后的代码

private boolean addWorker(Runnable firstTask, boolean core) {

   int wc = workerCountOf(c);

   if (wc = (core ? corePoolSize : maximumPoolSize))

       // 如果当前活动线程数 = 指定的核心线程数,不创建核心线程

       // 如果当前活动线程数 = 指定的最大线程数,不创建非核心线程  

       return false;

   boolean workerStarted = false;

   boolean workerAdded = false;

   Worker w = null;

   try {

       // 新建一个Worker,将要执行的任务作为参数传进去

       w = new Worker(firstTask);

       final Thread t = w.thread;

       if (t != null) {

           workers.add(w);

           workerAdded = true;

           if (workerAdded) {

               // 启动刚刚新建的那个worker持有的线程,等下要看看这个线程做了啥

               t.start();

               workerStarted = true;

           }

       }

   } finally {

       if (! workerStarted)

           addWorkerFailed(w);

   }

   return workerStarted;

}1234567891011121314151617181920212223242526272829303132

看到这里,我们大概能猜测到,addWorker方法的功能就是新建一个线程并启动这个线程,要执行的任务应该就是在这个线程中执行。为了证实我们的这种猜测需要再来看看Worker这个类。

private final class Worker

   extends AbstractQueuedSynchronizer

   implements Runnable{

   // ....

}

Worker(Runnable firstTask) {

   setState(-1); // inhibit interrupts until runWorker

   this.firstTask = firstTask;

   this.thread = getThreadFactory().newThread(this);

}123456789101112

从上面的Worker类的声明可以看到,它实现了Runnable接口,以及从它的构造方法中可以知道待执行的任务赋值给了它的变量firstTask,并以它自己为参数新建了一个线程赋值给它的变量thread,那么运行这个线程的时候其实就是执行Worker的run()方法,来看一下这个方法:

   public void run() {

       runWorker(this);

   }

   final void runWorker(Worker w) {

   Thread wt = Thread.currentThread();

   Runnable task = w.firstTask;

   w.firstTask = null;

   w.unlock(); // allow interrupts

   boolean completedAbruptly = true;

   try {

       while (task != null || (task = getTask()) != null) {

           w.lock();

           // If pool is stopping, ensure thread is interrupted;

           // if not, ensure thread is not interrupted.  This

           // requires a recheck in second case to deal with

           // shutdownNow race while clearing interrupt

           if ((runStateAtLeast(ctl.get(), STOP) ||

                (Thread.interrupted()

                 runStateAtLeast(ctl.get(), STOP)))

               !wt.isInterrupted())

               wt.interrupt();

           try {

               beforeExecute(wt, task);

               Throwable thrown = null;

               try {

                   task.run();

               } catch (RuntimeException x) {

                   thrown = x; throw x;

               } catch (Error x) {

                   thrown = x; throw x;

               } catch (Throwable x) {

                   thrown = x; throw new Error(x);

               } finally {

                   afterExecute(task, thrown);

               }

           } finally {

               task = null;

               w.completedTasks++;

               w.unlock();

           }

       }

       completedAbruptly = false;

   } finally {

       processWorkerExit(w, completedAbruptly);

   }

}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748

在run()方法中只调了一下 runWorker(this) 方法,再来简化一下这个 runWorker() 方法

// 为分析而简化后的代码

final void runWorker(Worker w) {

   Runnable task = w.firstTask;

   w.firstTask = null;

   while (task != null || (task = getTask()) != null) {

           try {

               task.run();

           } finally {

               task = null;

           }

       }

}12345678910111213

很明显,runWorker()方法里面执行了我们新建Worker对象时传进去的待执行的任务,到这里为止貌似这个worker的run()方法就执行完了,既然执行完了那么这个线程也就没用了,只有等待虚拟机销毁了。那么回顾一下我们的目标:Java线程池中的核心线程是如何被重复利用的?好像并没有重复利用啊,新建一个线程,执行一个任务,然后就结束了,销毁了。没什么特别的啊,难道有什么地方漏掉了,被忽略了?再仔细看一下runWorker()方法的代码,有一个while循环,当执行完firstTask后task==null了,那么就会执行判断条件 (task = getTask()) != null,我们假设这个条件成立的话,那么这个线程就不止只执行一个任务了,可以执行多个任务了,也就实现了重复利用了。答案呼之欲出了,接着看getTask()方法

private Runnable getTask() {

   boolean timedOut = false; // Did the last poll() time out?

   for (;;) {

       int c = ctl.get();

       int rs = runStateOf(c);

       // Check if queue empty only if necessary.

       if (rs = SHUTDOWN (rs = STOP || workQueue.isEmpty())) {

           decrementWorkerCount();

           return null;

       }

       int wc = workerCountOf(c);

       // Are workers subject to culling?

       boolean timed = allowCoreThreadTimeOut || wc corePoolSize;

       if ((wc maximumPoolSize || (timed timedOut))

            (wc 1 || workQueue.isEmpty())) {

           if (compareAndDecrementWorkerCount(c))

               return null;

           continue;

       }

       try {

           Runnable r = timed ?

               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

               workQueue.take();

           if (r != null)

               return r;

           timedOut = true;

       } catch (InterruptedException retry) {

           timedOut = false;

       }

   }

}1234567891011121314151617181920212223242526272829303132333435363738

老规矩,简化一下代码来看:

// 为分析而简化后的代码

private Runnable getTask() {

   boolean timedOut = false;

   for (;;) {

       int c = ctl.get();

       int wc = workerCountOf(c);

       // timed变量用于判断是否需要进行超时控制。

       // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;

       // wc corePoolSize,表示当前线程池中的线程数量大于核心线程数量;

       // 对于超过核心线程数量的这些线程,需要进行超时控制

       boolean timed = allowCoreThreadTimeOut || wc corePoolSize;

       if (timed timedOut) {

           // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,

           // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了

           if (compareAndDecrementWorkerCount(c))

               return null;

           continue;

       }

       try {

           Runnable r = timed ?

               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

               workQueue.take();

           // 注意workQueue中的poll()方法与take()方法的区别

           //poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null

           //take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止

           if (r != null)

               return r;

           timedOut = true;

       } catch (InterruptedException retry) {

           timedOut = false;

       }

   }

}123456789101112131415161718192021222324252627282930313233343536373839

从以上代码可以看出,getTask()的作用是

如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待keepAliveTime的时长,此时还没任务就返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程。

如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。

小结

通过以上的分析,应该算是比较清楚地解答了“线程池中的核心线程是如何被重复利用的”这个问题,同时也对线程池的实现机制有了更进一步的理解:

当有新任务来的时候,先看看当前的线程数有没有超过核心线程数,如果没超过就直接新建一个线程来执行新的任务,如果超过了就看看缓存队列有没有满,没满就将新任务放进缓存队列中,满了就新建一个线程来执行新的任务,如果线程池中的线程数已经达到了指定的最大线程数了,那就根据相应的策略拒绝任务。

当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来。

注意: 

本文所说的“核心线程”、“非核心线程”是一个虚拟的概念,是为了方便描述而虚拟出来的概念,在代码中并没有哪个线程被标记为“核心线程”或“非核心线程”,所有线程都是一样的,只是当线程池中的线程多于指定的核心线程数量时,会将多出来的线程销毁掉,池中只保留指定个数的线程。那些被销毁的线程是随机的,可能是第一个创建的线程,也可能是最后一个创建的线程,或其它时候创建的线程。一开始我以为会有一些线程被标记为“核心线程”,而其它的则是“非核心线程”,在销毁多余线程的时候只销毁那些“非核心线程”,而“核心线程”不被销毁。这种理解是错误的。

另外还有一个重要的接口 BlockingQueue 值得去了解,它定义了一些入队出队同步操作的方法,还可以阻塞,作用很大。

Java 线程池的问题

你的理解没毛病。

核心线程数(corePoolSize):核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。

最大线程数(maxPoolSize):当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。

线程池按以下行为执行任务

当线程数小于核心线程数时,创建线程。

当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。

当线程数大于等于核心线程数,且任务队列已满,1、若线程数小于最大线程数,创建线程;2、若线程数等于最大线程数,抛出异常,拒绝任务

javase线程怎么存储到容器

Java 并发重要知识点

java 线程池

ThreadPoolExecutor 类分析

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。

/**

* 用给定的初始参数创建一个新的ThreadPoolExecutor。

*/

public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量

int maximumPoolSize,//线程池的最大线程数

long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间

TimeUnit unit,//时间单位

BlockingQueueRunnable workQueue,//任务队列,用来储存等待执行任务的队列

ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可

RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务

) {

if (corePoolSize 0 ||

maximumPoolSize = 0 ||

maximumPoolSize corePoolSize ||

keepAliveTime 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

下面这些对创建非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。

ThreadPoolExecutor 3 个最重要的参数:

corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。

maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数 :

keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;

unit : keepAliveTime 参数的时间单位。

threadFactory :executor 创建新线程的时候会用到。

handler :饱和策略。关于饱和策略下面单独介绍一下。

下面这张图可以加深你对线程池中各个参数的相互关系的理解(图片来源:《Java 性能调优实战》):

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nzbqGRz9-1654600571133)()]

ThreadPoolExecutor 饱和策略定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:

ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException来拒绝新任务的处理。

ThreadPoolExecutor.CallerRunsPolicy :调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。

ThreadPoolExecutor.DiscardPolicy :不处理新任务,直接丢弃掉。

ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求。

举个例子:

Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。)

推荐使用 ThreadPoolExecutor 构造函数创建线程池

在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。

为什么呢?

使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors 返回线程池对象的弊端如下(后文会详细介绍到):

FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

方式一:通过ThreadPoolExecutor构造函数实现(推荐)通过构造方法实现

方式二:通过 Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor:

FixedThreadPool

SingleThreadExecutor

CachedThreadPool

对应 Executors 工具类中的方法如图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YGd4ygZu-1654600571136)()]

正确配置线程池参数

说到如何给线程池配置参数,美团的骚操作至今让我难忘(后面会提到)!

我们先来看一下各种书籍和博客上一般推荐的配置线程池参数的方式,可以作为参考!

常规操作

很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。

上下文切换:

多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。

上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。

Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。

类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。

如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。

但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

有一个简单并且适用面比较广的公式:

CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

美团的骚操作

美团技术团队在《Java线程池实现原理及其在美团业务中的实践》open in new window这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。

美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。这三个核心参数是:

corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。

maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

为什么是这三个参数?

我在这篇《新手也能看懂的线程池学习总结》open in new window 中就说过这三个参数是 ThreadPoolExecutor 最重要的参数,它们基本决定了线程池对于任务的处理策略。

如何支持参数动态配置? 且看 ThreadPoolExecutor 提供的下面这些方法。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Sm89qdJZ-1654600571137)()]

格外需要注意的是corePoolSize, 程序运行期间的时候,我们调用 setCorePoolSize() 这个方法的话,线程池会首先判断当前工作线程数是否大于corePoolSize,如果大于的话就会回收工作线程。

另外,你也看到了上面并没有动态指定队列长度的方法,美团的方式是自定义了一个叫做 ResizableCapacityLinkedBlockIngQueue 的队列(主要就是把LinkedBlockingQueue的capacity 字段的final关键字修饰给去掉了,让它变为可变的)。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cmNN5yAL-1654600571138)()]

还没看够?推荐 why神的[《如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。》open in new window](如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。 (qq.com))这篇文章,深度剖析,很不错哦!

Java 常见并发容器

JDK 提供的这些容器大部分在 java.util.concurrent 包中。

ConcurrentHashMap : 线程安全的 HashMap

CopyOnWriteArrayList : 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector。

ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列。

BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。

ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找。

ConcurrentHashMap

我们知道 HashMap 不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap() 方法来包装我们的 HashMap。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。

所以就有了 HashMap 的线程安全版本—— ConcurrentHashMap 的诞生。

在 ConcurrentHashMap 中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。

CopyOnWriteArrayList

CopyOnWriteArrayList 简介

public class CopyOnWriteArrayListE

extends Object

implements ListE, RandomAccess, Cloneable, Serializable

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操作是安全的。

这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。那它是怎么做的呢?

CopyOnWriteArrayList 是如何做到的?

CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。

从 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的。所谓 CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。

CopyOnWriteArrayList 读取和写入源码简单分析

CopyOnWriteArrayList 读取操作的实现

读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

/** The array, accessed only via getArray/setArray. */

private transient volatile Object[] array;

public E get(int index) {

return get(getArray(), index);

}

@SuppressWarnings("unchecked")

private E get(Object[] a, int index) {

return (E) a[index];

}

final Object[] getArray() {

return array;

}

CopyOnWriteArrayList 写入操作的实现

CopyOnWriteArrayList 写入操作 add()方法在添加集合的时候加了锁,保证了同步,避免了多线程写的时候会 copy 出多个副本出来。

/**

* Appends the specified element to the end of this list.

*

* @param e element to be appended to this list

* @return {@code true} (as specified by {@link Collection#add})

*/

public boolean add(E e) {

final ReentrantLock lock = this.lock;

lock.lock();//加锁

try {

Object[] elements = getArray();

int len = elements.length;

Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝新数组

newElements[len] = e;

setArray(newElements);

return true;

} finally {

lock.unlock();//释放锁

}

}

ConcurrentLinkedQueue

Java 提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。

从名字可以看出,ConcurrentLinkedQueue这个队列使用链表作为其数据结构.ConcurrentLinkedQueue 应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue 内部代码我们就不分析了,大家知道 ConcurrentLinkedQueue 主要使用 CAS 非阻塞算法来实现线程安全就好了。

ConcurrentLinkedQueue 适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue 来替代。

BlockingQueue

BlockingQueue 简介

上面我们己经提到了 ConcurrentLinkedQueue 作为高性能的非阻塞队列。下面我们要讲到的是阻塞队列——BlockingQueue。阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。下面是 BlockingQueue 的相关实现类:

BlockingQueue 的实现类

下面主要介绍一下 3 个常见的 BlockingQueue 的实现类:ArrayBlockingQueue、LinkedBlockingQueue 、PriorityBlockingQueue 。

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。

public class ArrayBlockingQueueE

extends AbstractQueueE

implements BlockingQueueE, Serializable{}

ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock ,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。

ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue。而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue 可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的 ArrayBlockingQueue,可采用如下代码:

private static ArrayBlockingQueueInteger blockingQueue = new ArrayBlockingQueueInteger(10,true);

1

1

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE 。

相关构造方法:

/**

*某种意义上的无界队列

* Creates a {@code LinkedBlockingQueue} with a capacity of

* {@link Integer#MAX_VALUE}.

*/

public LinkedBlockingQueue() {

this(Integer.MAX_VALUE);

}

/**

*有界队列

* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.

*

* @param capacity the capacity of this queue

* @throws IllegalArgumentException if {@code capacity} is not greater

* than zero

*/

public LinkedBlockingQueue(int capacity) {

if (capacity = 0) throw new IllegalArgumentException();

this.capacity = capacity;

last = head = new NodeE(null);

}

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。

PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock,队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

推荐文章: 《解读 Java 并发队列 BlockingQueue》open in new window

ConcurrentSkipListMap

下面这部分内容参考了极客时间专栏《数据结构与算法之美》open in new window以及《实战 Java 高并发程序设计》。

为了引出 ConcurrentSkipListMap,先带着大家简单理解一下跳表。

对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。

跳表的本质是同时维护了多个链表,并且链表是分层的,

2级索引跳表

最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。

跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素 18。

在跳表中查找元素18

查找 18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显。

从上面很容易看出,跳表是一种利用空间换时间的算法。

使用跳表实现 Map 和使用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListMap。

如何解释一个java线程的堆栈

itjobJava老师讲过:1) 线程堆栈概述及基础知识

2) 线程堆栈的生成原理以及相关工具

3) 不同JVM线程堆栈的格式的差异(Sun HotSpot、IBM JRE、Oracal JRockit)

4) 线程堆栈日志介绍以及解析方法

5) 线程堆栈的分析和相关的技术

6) 常见的问题模板(线程竟态、死锁、IO调用挂死、垃圾回收/OutOfMemoryError问题、死循环等)

7) 线程堆栈问题实例分析

我希望这一系列的培训能给你带来确实的帮助,所以请持续关注每周的文章更新。

但是如果我在学习过程中有疑问或者无法理解文章中的内容该怎么办?

不用担心,把我当做你的导师就好。任何关于线程堆栈的问题都可以咨询我(前提是问题不能太low)。请随意选择下面的几种方式与我取得联系:

1) 直接本文下面发表评论(不好意思的话可以匿名)

2) 将你的线程堆栈数据提交到Root Cause Analysis forum

3) 发Email给我,地址是 @phcharbonneau@hotmail.com

能帮我分析我们产品上遇到的问题么?

当然可以,如果你愿意的话可以把你的堆栈现场数据通过邮件或论坛 Root Cause Analysis forum发给我。处理实际问题是才是学习提升技能的王道。

我真心期望大家能够喜欢这个培训。所以我会尽我所能去为你提供高质量的材料,并回答大家的各种问题。

在介绍线程堆栈分析技术和问题模式之前,先要给大家讲讲基础的内容。所以在这篇帖子里,我将先覆盖到最基本的内容,这样大家就能更好的去理解JVM、中间件、以及Java EE容器之间的交互。

Java VM 概述

Java虚拟机是Jave EE 平台的基础。它是中间件和应用程序被部署和运行的地方。

JVM向中间件软件和你的Java/Java EE程序提供了下面这些东西:

– (二进制形式的)Java / Java EE 程序运行环境

– 一些程序功能特性和工具 (IO 基础设施,数据结构,线程管理,安全,监控 等等.)

– 借助垃圾回收的动态内存分配与管理

你的JVM可以驻留在许多的操作系统 (Solaris, AIX, Windows 等等.)之上,并且能根据你的物理服务器配置,你可以在每台物理/虚拟服务器上安装1到多个JVM进程.

JVM与中间件之间的交互

下面这张图展示了JVM、中间件和应用程序之间的高层交互模型。

如你所见,标准Java EE应用程序的线程的分配实在中间件内核与JVM之间完成的。(当然也有例外,应用程序可以直接调用API来创建线程,这种做法并不常见,而且在使用的过程中也要特别的小心)

同时,请注意一些线程是由JVM内部来进行管理的,典型的例子就是垃圾回收线程,JVM内部使用这个线程来做并行的垃圾回收处理。

因为大多数的线程分配都是由Java EE容器完成的,所以能够理解和认识线程堆栈跟踪,并能从线程堆栈数据中识别出它来,对你而言很重要. 这可以让你能够快速的知道Java EE容器正要执行的是什么类型的请求.

从一个线程转储堆栈的分析角度来看,你将能了解从JVM发现的线程池之间的不同,并识别出请求的类型.

最后一节会向你提供对于HotSop VM而言什么是JVM线程堆栈的一个概述,还有你将会遇到的各种不同的线程. 而对 IBM VM 线程堆栈形式详细内容将会在第四节向你提供.

请注意你可以从根本原因分析论坛获得针对本文的线程堆栈示例.

JVM 线程堆栈——它是什么?

JVM线程堆栈是一个给定时间的快照,它能向你提供所有被创建出来的Java线程的完整清单.

java 线程有哪些状态,这些状态之间是如何转化的

线程在它的生命周期中会处于各种不同的状态:

新建、等待、就绪、运行、阻塞、死亡。

1 新建

用new语句创建的线程对象处于新建状态,此时它和其他java对象一样,仅被分配了内存。

2等待

当线程在new之后,并且在调用start方法前,线程处于等待状态。

3 就绪

当一个线程对象创建后,其他线程调用它的start()方法,该线程就进入就绪状态。处于这个状态的线程位于Java虚拟机的可运行池中,等待cpu的使用权。

4 运行状态

处于这个状态的线程占用CPU,执行程序代码。在并发运行环境中,如果计算机只有一个CPU,那么任何时刻只会有一个线程处于这个状态。

只有处于就绪状态的线程才有机会转到运行状态。

5 阻塞状态

阻塞状态是指线程因为某些原因放弃CPU,暂时停止运行。当线程处于阻塞状态时,Java虚拟机不会给线程分配CPU,直到线程重新进入就绪状态,它才会有机会获得运行状态。

6 死亡状态

当线程执行完run()方法中的代码,或者遇到了未捕获的异常,就会退出run()方法,此时就进入死亡状态,该线程结束生命周期。

由于java线程调度不是分时的,如果程序希望干预java虚拟机对线程的调度过程,从而明确地让一个线程给另外一个线程运行的机会,可以采用以下的方法

1 调整各个线程的优先级

2 让处于运行状态的线程调用Thread.sleep(long time)方法 放弃CPU 进入阻塞状态。

sleep方法可能抛出InterruptedException

线程休眠后只能在指定的时间后使线程处于就绪状态。(也就是等待cpu的调度)

3 让处于运行状态的线程调用Thread.yield()方法 只会同优先级让步或更高优先级让步,进入就绪状态。

4 让处于运行状态的线程调用另一个线程的join()方法

当前运行的线程可以调用另一个线程的join()方法,当前运行的线程将转到阻塞状态,直至另一个线程运行结束,它才会转到就绪状态 从而有机会恢复运行。

通过一下几种途径中的一种,线程可以从被阻塞状态到可运行状态。

1 线程被置于睡眠状态,且已经经过指定的毫秒数。

2 线程正在等待I/O操作的完成,且该操作已经完成。

3 线程正在等待另一个线程所持有的锁,且另一个线程已经释放该锁的所有权;(也有可能等待超时。当超时发生时,线程解除阻塞。)

4 线程正在等待某个触发条件,且另一个线程发出了信号表明条件已经发生了变化。(如果为线程的等待设置了一个超时,那么当超时发生时该线程将解除阻塞。)

5 线程已经被挂起,且有人调用了它的resume方法。不过,由于suspend方法已经过时,resume方法也就随之被弃用了,你不应该在自己的代码里调用它。(现在应该用sleep取而代之。)

java线程驻留的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于java 常驻线程、java线程驻留的信息别忘了在本站进行查找喔。

The End

发布于:2022-12-11,除非注明,否则均为首码项目网原创文章,转载请注明出处。