「java线程池方法」java线程池的工作流程
本篇文章给大家谈谈java线程池方法,以及java线程池的工作流程对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
java线程池(一) 简述线程池的几种使用方式
首先说明下java线程是如何实现线程重用的
1. 线程执行完一个Runnable的run()方法后,不会被杀死
2. 当线程被重用时,这个线程会进入新Runnable对象的run()方法12
java线程池由Executors提供的几种静态方法创建线程池。下面通过代码片段简单介绍下线程池的几种实现方式。后续会针对每个实现方式做详细的说明
newFixedThreadPool
创建一个固定大小的线程池
添加的任务达到线程池的容量之后开始加入任务队列开始线程重用总共开启线程个数跟指定容量相同。
@Test
public void newFixedThreadPool() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().build());
RunThread run1 = new RunThread("run 1");
executorService.execute(run1);
executorService.shutdown();
}12345678
newSingleThreadExecutor
仅支持单线程顺序处理任务
@Test
public void newSingleThreadExecutor() throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().build());
executorService.execute(new RunThread("run 1"));
executorService.execute(new RunThread("run 2"));
executorService.shutdown();
}123456789
newCachedThreadPool
这种情况跟第一种的方式类似,不同的是这种情况线程池容量上线是Integer.MAX_VALUE 并且线程池开启缓存60s
@Test
public void newCachedThreadPool() throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().build());
executorService.execute(new RunThread("run 1"));
executorService.execute(new RunThread("run 2"));
executorService.shutdown();
}123456789
newWorkStealingPool
支持给定的并行级别,并且可以使用多个队列来减少争用。
@Test
public void newWorkStealingPool() throws Exception {
ExecutorService executorService = Executors.newWorkStealingPool();
executorService = Executors.newWorkStealingPool(1);
RunThread run1 = new RunThread("run 1");
executorService.execute(run1);
executorService.shutdown();
}123456789
newScheduledThreadPool
看到的现象和第一种相同,也是在线程池满之前是新建线程,然后开始进入任务队列,进行线程重用
支持定时周期执行任务(还没有看完)
@Test
public void newScheduledThreadPool() throws Exception {
ExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
executorService.execute(new RunThread("run 1"));
executorService.execute(new RunThread("run 2"));
executorService.shutdown();
}
Java实现通用线程池
线程池通俗的描述就是预先创建若干空闲线程 等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务 这样就省去了频繁创建线程的时间 因为频 繁创建线程是要耗费大量的CPU资源的 如果一个应用程序需要频繁地处理大量并发事务 不断的创建销毁线程往往会大大地降低系统的效率 这时候线程池就派 上用场了
本文旨在使用Java语言编写一个通用的线程池 当需要使用线程池处理事务时 只需按照指定规范封装好事务处理对象 然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可 并实现线程池的动态修改(修改当前线程数 最大线程数等) 下面是实现代码
//ThreadTask java
package polarman threadpool;
/** *//**
*线程任务
* @author ryang
*
*/
public interface ThreadTask {
public void run();
}
//PooledThread java
package polarman threadpool;
import java util Collection; import java util Vector;
/** *//**
*接受线程池管理的线程
* @author ryang
*
*/
public class PooledThread extends Thread {
protected Vector tasks = new Vector();
protected boolean running = false;
protected boolean stopped = false;
protected boolean paused = false;
protected boolean killed = false;
private ThreadPool pool;
public PooledThread(ThreadPool pool) { this pool = pool;
}
public void putTask(ThreadTask task) { tasks add(task);
}
public void putTasks(ThreadTask[] tasks) { for(int i= ; itasks length; i++) this tasks add(tasks[i]);
}
public void putTasks(Collection tasks) { this tasks addAll(tasks);
}
protected ThreadTask popTask() { if(tasks size() ) return (ThreadTask)tasks remove( );
else
return null;
}
public boolean isRunning() {
return running;
}
public void stopTasks() {
stopped = true;
}
public void stopTasksSync() {
stopTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void pauseTasks() {
paused = true;
}
public void pauseTasksSync() {
pauseTasks();
while(isRunning()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public void kill() { if(!running)
interrupt();
else
killed = true;
}
public void killSync() {
kill();
while(isAlive()) { try {
sleep( );
} catch (InterruptedException e) {
}
}
}
public synchronized void startTasks() {
running = true;
this notify();
}
public synchronized void run() { try { while(true) { if(!running || tasks size() == ) { pool notifyForIdleThread(); //System out println(Thread currentThread() getId() + : 空闲 ); this wait(); }else {
ThreadTask task;
while((task = popTask()) != null) { task run(); if(stopped) {
stopped = false;
if(tasks size() ) { tasks clear(); System out println(Thread currentThread() getId() + : Tasks are stopped );
break;
}
}
if(paused) {
paused = false;
if(tasks size() ) { System out println(Thread currentThread() getId() + : Tasks are paused );
break;
}
}
}
running = false;
}
if(killed) {
killed = false;
break;
}
}
}catch(InterruptedException e) {
return;
}
//System out println(Thread currentThread() getId() + : Killed );
}
}
//ThreadPool java
package polarman threadpool;
import java util Collection; import java util Iterator; import java util Vector;
/** *//**
*线程池
* @author ryang
*
*/
public class ThreadPool {
protected int maxPoolSize;
protected int initPoolSize;
protected Vector threads = new Vector();
protected boolean initialized = false;
protected boolean hasIdleThread = false;
public ThreadPool(int maxPoolSize int initPoolSize) { this maxPoolSize = maxPoolSize; this initPoolSize = initPoolSize;
}
public void init() {
initialized = true;
for(int i= ; iinitPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
//System out println( 线程池初始化结束 线程数= + threads size() + 最大线程数= + maxPoolSize);
}
public void setMaxPoolSize(int maxPoolSize) { //System out println( 重设最大线程数 最大线程数= + maxPoolSize); this maxPoolSize = maxPoolSize;
if(maxPoolSize getPoolSize())
setPoolSize(maxPoolSize);
}
/** *//**
*重设当前线程数
* 若需杀掉某线程 线程不会立刻杀掉 而会等到线程中的事务处理完成* 但此方法会立刻从线程池中移除该线程 不会等待事务处理结束
* @param size
*/
public void setPoolSize(int size) { if(!initialized) {
initPoolSize = size;
return;
}else if(size getPoolSize()) { for(int i=getPoolSize(); isize imaxPoolSize; i++) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
}
}else if(size getPoolSize()) { while(getPoolSize() size) { PooledThread th = (PooledThread)threads remove( ); th kill();
}
}
//System out println( 重设线程数 线程数= + threads size());
}
public int getPoolSize() { return threads size();
}
protected void notifyForIdleThread() {
hasIdleThread = true;
}
protected boolean waitForIdleThread() {
hasIdleThread = false;
while(!hasIdleThread getPoolSize() = maxPoolSize) { try { Thread sleep( ); } catch (InterruptedException e) {
return false;
}
}
return true;
}
public synchronized PooledThread getIdleThread() { while(true) { for(Iterator itr=erator(); itr hasNext();) { PooledThread th = (PooledThread)itr next(); if(!th isRunning())
return th;
}
if(getPoolSize() maxPoolSize) {
PooledThread thread = new PooledThread(this);
thread start(); threads add(thread);
return thread;
}
//System out println( 线程池已满 等待 );
if(waitForIdleThread() == false)
return null;
}
}
public void processTask(ThreadTask task) {
PooledThread th = getIdleThread();
if(th != null) { th putTask(task); th startTasks();
}
}
public void processTasksInSingleThread(ThreadTask[] tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
public void processTasksInSingleThread(Collection tasks) {
PooledThread th = getIdleThread();
if(th != null) { th putTasks(tasks); th startTasks();
}
}
}
下面是线程池的测试程序
//ThreadPoolTest java
import java io BufferedReader; import java io IOException; import java io InputStreamReader;
import polarman threadpool ThreadPool; import polarman threadpool ThreadTask;
public class ThreadPoolTest {
public static void main(String[] args) { System out println( quit 退出 ); System out println( task A 启动任务A 时长为 秒 ); System out println( size 设置当前线程池大小为 ); System out println( max 设置线程池最大线程数为 ); System out println();
final ThreadPool pool = new ThreadPool( ); pool init();
Thread cmdThread = new Thread() { public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System in));
while(true) { try { String line = reader readLine(); String words[] = line split( ); if(words[ ] equalsIgnoreCase( quit )) { System exit( ); }else if(words[ ] equalsIgnoreCase( size ) words length = ) { try { int size = Integer parseInt(words[ ]); pool setPoolSize(size); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( max ) words length = ) { try { int max = Integer parseInt(words[ ]); pool setMaxPoolSize(max); }catch(Exception e) {
}
}else if(words[ ] equalsIgnoreCase( task ) words length = ) { try { int timelen = Integer parseInt(words[ ]); SimpleTask task = new SimpleTask(words[ ] timelen * ); pool processTask(task); }catch(Exception e) {
}
}
} catch (IOException e) { e printStackTrace();
}
}
}
};
cmdThread start();
/**//*
for(int i= ; i ; i++){
SimpleTask task = new SimpleTask( Task + i (i+ )* ); pool processTask(task);
}*/
}
}
class SimpleTask implements ThreadTask {
private String taskName;
private int timeLen;
public SimpleTask(String taskName int timeLen) { this taskName = taskName; this timeLen = timeLen;
}
public void run() { System out println(Thread currentThread() getId() +
: START TASK + taskName + );
try { Thread sleep(timeLen); } catch (InterruptedException e) {
}
System out println(Thread currentThread() getId() +
: END TASK + taskName + );
}
}
使用此线程池相当简单 下面两行代码初始化线程池
ThreadPool pool = new ThreadPool( ); pool init();
要处理的任务实现ThreadTask 接口即可(如测试代码里的SimpleTask) 这个接口只有一个方法run()
两行代码即可调用
lishixinzhi/Article/program/Java/hx/201311/27203
java线程池有几种
一:newCachedThreadPool
(1)缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse,如果没有,就建立一个新的线程加入池中;
(2)缓存型池子,通常用于执行一些生存周期很短的异步型任务;因此一些面向连接的daemon型server中用得不多;
(3)能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
(4)注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止
二:newFixedThreadPool
(1)newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
(2)其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
(3)和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器
(4)从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:
fixed池线程数固定,并且是0秒IDLE(无IDLE)
cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE
三:ScheduledThreadPool
(1)调度型线程池
(2)这个池子里的线程可以按schedule依次delay执行,或周期执行
四:SingleThreadExecutor
(1)单例线程,任意时间池中只能有一个线程
(2)用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)
java线程池怎么实现的
线程池简介:
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
一个线程池包括以下四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:
假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。
代码实现中并没有实现任务接口,而是把Runnable对象加入到线程池管理器(ThreadPool),然后剩下的事情就由线程池管理器(ThreadPool)来完成了
package mine.util.thread;
import java.util.LinkedList;
import java.util.List;
/**
* 线程池类,线程管理器:创建线程,执行任务,销毁线程,获取线程基本信息
*/
public final class ThreadPool {
// 线程池中默认线程的个数为5
private static int worker_num = 5;
// 工作线程
private WorkThread[] workThrads;
// 未处理的任务
private static volatile int finished_task = 0;
// 任务队列,作为一个缓冲,List线程不安全
private ListRunnable taskQueue = new LinkedListRunnable();
private static ThreadPool threadPool;
// 创建具有默认线程个数的线程池
private ThreadPool() {
this(5);
}
// 创建线程池,worker_num为线程池中工作线程的个数
private ThreadPool(int worker_num) {
ThreadPool.worker_num = worker_num;
workThrads = new WorkThread[worker_num];
for (int i = 0; i worker_num; i++) {
workThrads[i] = new WorkThread();
workThrads[i].start();// 开启线程池中的线程
}
}
// 单态模式,获得一个默认线程个数的线程池
public static ThreadPool getThreadPool() {
return getThreadPool(ThreadPool.worker_num);
}
// 单态模式,获得一个指定线程个数的线程池,worker_num(0)为线程池中工作线程的个数
// worker_num=0创建默认的工作线程个数
public static ThreadPool getThreadPool(int worker_num1) {
if (worker_num1 = 0)
worker_num1 = ThreadPool.worker_num;
if (threadPool == null)
threadPool = new ThreadPool(worker_num1);
return threadPool;
}
// 执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
public void execute(Runnable task) {
synchronized (taskQueue) {
taskQueue.add(task);
taskQueue.notify();
}
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
public void execute(Runnable[] task) {
synchronized (taskQueue) {
for (Runnable t : task)
taskQueue.add(t);
taskQueue.notify();
}
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
public void execute(ListRunnable task) {
synchronized (taskQueue) {
for (Runnable t : task)
taskQueue.add(t);
taskQueue.notify();
}
}
// 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
public void destroy() {
while (!taskQueue.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 工作线程停止工作,且置为null
for (int i = 0; i worker_num; i++) {
workThrads[i].stopWorker();
workThrads[i] = null;
}
threadPool=null;
taskQueue.clear();// 清空任务队列
}
// 返回工作线程的个数
public int getWorkThreadNumber() {
return worker_num;
}
// 返回已完成任务的个数,这里的已完成是只出了任务队列的任务个数,可能该任务并没有实际执行完成
public int getFinishedTasknumber() {
return finished_task;
}
// 返回任务队列的长度,即还没处理的任务个数
public int getWaitTasknumber() {
return taskQueue.size();
}
// 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数
@Override
public String toString() {
return "WorkThread number:" + worker_num + " finished task number:"
+ finished_task + " wait task number:" + getWaitTasknumber();
}
/**
* 内部类,工作线程
*/
private class WorkThread extends Thread {
// 该工作线程是否有效,用于结束该工作线程
private boolean isRunning = true;
/*
* 关键所在啊,如果任务队列不空,则取出任务执行,若任务队列空,则等待
*/
@Override
public void run() {
Runnable r = null;
while (isRunning) {// 注意,若线程无效则自然结束run方法,该线程就没用了
synchronized (taskQueue) {
while (isRunning taskQueue.isEmpty()) {// 队列为空
try {
taskQueue.wait(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!taskQueue.isEmpty())
r = taskQueue.remove(0);// 取出任务
}
if (r != null) {
r.run();// 执行任务
}
finished_task++;
r = null;
}
}
// 停止工作,让该线程自然执行完run方法,自然结束
public void stopWorker() {
isRunning = false;
}
}
}
java executors有哪些方法
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
(1) newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:
Java代码
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(index);
}
});
}
}
}
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
(2) newFixedThreadPool(项目用过)
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:
Java代码
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
(3) newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:
Java代码
package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}
表示延迟3秒执行。
定期执行示例代码如下:
Java代码
package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
表示延迟1秒后每3秒执行一次。
(4) newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:
Java代码
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
结果依次输出,相当于顺序执行各个任务。
你可以使用JDK自带的监控工具来监控我们创建的线程数量,运行一个不终止的线程,创建指定量的线程,来观察:
工具目录:C:\Program Files\Java\jdk1.6.0_06\bin\jconsole.exe
运行程序做稍微修改:
Java代码
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newCachedThreadPool();
for (int i = 0; i 100; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
while(true) {
System.out.println(index);
Thread.sleep(10 * 1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
java线程池方法的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于java线程池的工作流程、java线程池方法的信息别忘了在本站进行查找喔。