「java队列执行」java队列用法
本篇文章给大家谈谈java队列执行,以及java队列用法对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
java中如何实现按队列执行任务
package com.tone.example;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.tone.task.TaskProperty;
import com.tone.task.TaskSignature;
import com.tone.task.impl.BasicTask;
import com.tone.task.runner.TaskRunner;
/**
* 任务队列示例程序
* @author zlf
*/
public class TaskExample {
private TaskRunner taskRunner;
/**
* 做任务队列的初始化工作
*/
@Before
public void init() {
// 获取任务运行器
taskRunner = TaskRunner.getInstance();
// 将任务运行器放入线程进行调度
Thread thread = new Thread(taskRunner);
thread.start();
}
/**
* 等待任务执行完成,并做最后的退出工作
*/
@After
public void exit() throws InterruptedException {
Thread.sleep(600);
System.exit(0);
}
/**
* 最简单的任务运行示例
*/
@Test
public void example1() {
// 添加任务到任务运行器
taskRunner.addTask(new BasicTask() {
@Override
public void run() {
System.out.println("This is running in task runner thread, and thread is " + Thread.currentThread());
}
});
}
/**
* 加入优先执行顺序的任务运行器
*/
@Test
public void example2() {
// 添加任务到任务运行器
taskRunner.addTask(new BasicTask(0) {
@Override
public void run() {
System.out.println("This is a normal task");
}
});
taskRunner.addTask(new BasicTask(-1) {
@Override
public void run() {
System.out.println("This is a task a bit high than normal");
}
});
}
/**
* 重复添加的任务只会运行第一个
*/
@Test
public void example3() {
// 添加任务到任务运行器
taskRunner.addTask(new BasicTask(TaskSignature.ONE) {
@Override
public void run() {
System.out.println("This is task one");
}
}, TaskProperty.NOT_REPEAT);
taskRunner.addTask(new BasicTask(TaskSignature.ONE) {
@Override
public void run() {
System.out.println("This is also task one");
}
}, TaskProperty.NOT_REPEAT);
}
/**
* 重复添加的任务只会运行最后一个
*/
@Test
public void example4() {
// 添加任务到任务运行器
taskRunner.addTask(new BasicTask(TaskSignature.ONE) {
@Override
public void run() {
System.out.println("This is task one");
}
}, TaskProperty.NOT_REPEAT_OVERRIDE);
taskRunner.addTask(new BasicTask(TaskSignature.ONE) {
@Override
public void run() {
System.out.println("This is also task one");
}
}, TaskProperty.NOT_REPEAT_OVERRIDE);
}
}
java 线程池 工作队列是如何工作的
使用线程池的好处
1、降低资源消耗
可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
2、提高响应速度
当任务到达时,任务可以不需要等到线程创建就能立即执行。
3、提高线程的可管理性
线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控
线程池的工作原理
首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的
1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务
线程池饱和策略
这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:
AbortPolicy
为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
DiscardPolicy
直接抛弃,任务不执行,空方法
DiscardOldestPolicy
从队列里面抛弃head的一个任务,并再次execute 此task。
CallerRunsPolicy
在调用execute的线程里面执行此command,会阻塞入口
用户自定义拒绝策略(最常用)
实现RejectedExecutionHandler,并自己定义策略模式
下我们以ThreadPoolExecutor为例展示下线程池的工作流程图
1.jpg
2.jpg
1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
线程池只是并发编程中的一小部分,下图是史上最全面的Java的并发编程学习技术总汇
3.jpg
关键方法源码分析
我们看看核心方法添加到线程池方法execute的源码如下:
// //Executes the given task sometime in the future. The task //may execute in a new thread or in an existing pooled thread. // // If the task cannot be submitted for execution, either because this // executor has been shutdown or because its capacity has been reached, // the task is handled by the current {@code RejectedExecutionHandler}. // // @param command the task to execute // @throws RejectedExecutionException at discretion of // {@code RejectedExecutionHandler}, if the task // cannot be accepted for execution // @throws NullPointerException if {@code command} is null // public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // // Proceed in 3 steps: // // 1. If fewer than corePoolSize threads are running, try to // start a new thread with the given command as its first // task. The call to addWorker atomically checks runState and // workerCount, and so prevents false alarms that would add // threads when it shouldn't, by returning false. // 翻译如下: // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程, // 如果能完成新线程创建exexute方法结束,成功提交任务 // 2. If a task can be successfully queued, then we still need // to double-check whether we should have added a thread // (because existing ones died since last checking) or that // the pool shut down since entry into this method. So we // recheck state and if necessary roll back the enqueuing if // stopped, or start a new thread if there are none. // 翻译如下: // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态 // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要 // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程; // 3. If we cannot queue task, then we try to add a new // thread. If it fails, we know we are shut down or saturated // and so reject the task. // 翻译如下: // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池 // 已经达到饱和状态,所以reject这个他任务 // int c = ctl.get(); // 工作线程数小于核心线程数 if (workerCountOf(c) corePoolSize) { // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 如果工作线程数大于等于核心线程数 // 线程的的状态未RUNNING并且队列notfull if (isRunning(c) workQueue.offer(command)) { // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除 int recheck = ctl.get(); if (! isRunning(recheck) remove(command)) // 移除成功,拒绝该非运行的任务 reject(command); else if (workerCountOf(recheck) == 0) // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); } // 如果队列满了或者是非运行的任务都拒绝执行 else if (!addWorker(command, false)) reject(command); }
java队列实现异步执行
在整个思路上要调整一下
1、会有很多线程给一个队列上添加任务
2、有一个或者多个线程逐个执行队列的任务
考虑一下几点:
1、没有任务时,队列执行线程处于等待状态
2、添加任务时,激活队列执行线程,全部run起来,首先抢到任务的执行,其他全部wait
给个小例子吧
package org;
import java.util.LinkedList;
import java.util.List;
public class Queues {
public static ListTask queue = new LinkedListTask();
/**
* 假如 参数o 为任务
* @param o
*/
public static void add (Task t){
synchronized (Queues.queue) {
Queues.queue.add(t); //添加任务
Queues.queue.notifyAll();//激活该队列对应的执行线程,全部Run起来
}
}
static class Task{
public void test(){
System.out.println("我被执行了");
}
}
}
package org;
import java.util.List;
public class Exec implements Runnable{
@Override
public void run() {
while(true){
synchronized (Queues.queue) {
while(Queues.queue.isEmpty()){ //
try {
Queues.queue.wait(); //队列为空时,使线程处于等待状态
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("wait...");
}
Queues.Task t= Queues.queue.remove(0); //得到第一个
t.test(); //执行该任务
System.out.println("end");
}
}
}
public static void main(String[] args) {
Exec e = new Exec();
for (int i = 0; i 2; i++) {
new Thread(e).start(); //开始执行时,队列为空,处于等待状态
}
//上面开启两个线程执行队列中的任务,那就是先到先得了
//添加一个任务测试
Queues.Task t =new Queues.Task();
Queues.add(t); //执行该方法,激活所有对应队列,那两个线程就会开始执行啦
}
}
上面的就是很简单的例子了
java队列执行的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于java队列用法、java队列执行的信息别忘了在本站进行查找喔。
发布于:2022-11-22,除非注明,否则均为
原创文章,转载请注明出处。