「java队列执行」java队列用法

博主:adminadmin 2022-11-22 07:48:05 44

本篇文章给大家谈谈java队列执行,以及java队列用法对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

java中如何实现按队列执行任务

package com.tone.example;

import org.junit.After;

import org.junit.Before;

import org.junit.Test;

import com.tone.task.TaskProperty;

import com.tone.task.TaskSignature;

import com.tone.task.impl.BasicTask;

import com.tone.task.runner.TaskRunner;

/**

* 任务队列示例程序

* @author zlf

*/

public class TaskExample {

private TaskRunner taskRunner;

/**

* 做任务队列的初始化工作

*/

@Before

public void init() {

// 获取任务运行器

taskRunner = TaskRunner.getInstance();

// 将任务运行器放入线程进行调度

Thread thread = new Thread(taskRunner);

thread.start();

}

/**

* 等待任务执行完成,并做最后的退出工作

*/

@After

public void exit() throws InterruptedException {

Thread.sleep(600);

System.exit(0);

}

/**

* 最简单的任务运行示例

*/

@Test

public void example1() {

// 添加任务到任务运行器

taskRunner.addTask(new BasicTask() {

@Override

public void run() {

System.out.println("This is running in task runner thread, and thread is " + Thread.currentThread());

}

});

}

/**

* 加入优先执行顺序的任务运行器

*/

@Test

public void example2() {

// 添加任务到任务运行器

taskRunner.addTask(new BasicTask(0) {

@Override

public void run() {

System.out.println("This is a normal task");

}

});

taskRunner.addTask(new BasicTask(-1) {

@Override

public void run() {

System.out.println("This is a task a bit high than normal");

}

});

}

/**

* 重复添加的任务只会运行第一个

*/

@Test

public void example3() {

// 添加任务到任务运行器

taskRunner.addTask(new BasicTask(TaskSignature.ONE) {

@Override

public void run() {

System.out.println("This is task one");

}

}, TaskProperty.NOT_REPEAT);

taskRunner.addTask(new BasicTask(TaskSignature.ONE) {

@Override

public void run() {

System.out.println("This is also task one");

}

}, TaskProperty.NOT_REPEAT);

}

/**

* 重复添加的任务只会运行最后一个

*/

@Test

public void example4() {

// 添加任务到任务运行器

taskRunner.addTask(new BasicTask(TaskSignature.ONE) {

@Override

public void run() {

System.out.println("This is task one");

}

}, TaskProperty.NOT_REPEAT_OVERRIDE);

taskRunner.addTask(new BasicTask(TaskSignature.ONE) {

@Override

public void run() {

System.out.println("This is also task one");

}

}, TaskProperty.NOT_REPEAT_OVERRIDE);

}

}

java 线程池 工作队列是如何工作的

使用线程池的好处

1、降低资源消耗

可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

2、提高响应速度

当任务到达时,任务可以不需要等到线程创建就能立即执行。

3、提高线程的可管理性

线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

线程池的工作原理

首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的

1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。

2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步

3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

线程池饱和策略

这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:

AbortPolicy

为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。

DiscardPolicy

直接抛弃,任务不执行,空方法

DiscardOldestPolicy

从队列里面抛弃head的一个任务,并再次execute 此task。

CallerRunsPolicy

在调用execute的线程里面执行此command,会阻塞入口

用户自定义拒绝策略(最常用)

实现RejectedExecutionHandler,并自己定义策略模式

下我们以ThreadPoolExecutor为例展示下线程池的工作流程图

1.jpg

2.jpg

1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

线程池只是并发编程中的一小部分,下图是史上最全面的Java的并发编程学习技术总汇

3.jpg

关键方法源码分析

我们看看核心方法添加到线程池方法execute的源码如下:

//     //Executes the given task sometime in the future.  The task     //may execute in a new thread or in an existing pooled thread.     //     // If the task cannot be submitted for execution, either because this     // executor has been shutdown or because its capacity has been reached,     // the task is handled by the current {@code RejectedExecutionHandler}.     //     // @param command the task to execute     // @throws RejectedExecutionException at discretion of     //         {@code RejectedExecutionHandler}, if the task     //         cannot be accepted for execution     // @throws NullPointerException if {@code command} is null     //    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        //         // Proceed in 3 steps:         //         // 1. If fewer than corePoolSize threads are running, try to         // start a new thread with the given command as its first         // task.  The call to addWorker atomically checks runState and         // workerCount, and so prevents false alarms that would add         // threads when it shouldn't, by returning false.         // 翻译如下:         // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,         // 如果能完成新线程创建exexute方法结束,成功提交任务         // 2. If a task can be successfully queued, then we still need         // to double-check whether we should have added a thread         // (because existing ones died since last checking) or that         // the pool shut down since entry into this method. So we         // recheck state and if necessary roll back the enqueuing if         // stopped, or start a new thread if there are none.         // 翻译如下:         // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态         // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要         // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;         // 3. If we cannot queue task, then we try to add a new         // thread.  If it fails, we know we are shut down or saturated         // and so reject the task.         // 翻译如下:         // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池         // 已经达到饱和状态,所以reject这个他任务         //        int c = ctl.get();        // 工作线程数小于核心线程数        if (workerCountOf(c) corePoolSize) {            // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize            if (addWorker(command, true))                return;            c = ctl.get();        }        // 如果工作线程数大于等于核心线程数        // 线程的的状态未RUNNING并且队列notfull        if (isRunning(c) workQueue.offer(command)) {            // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除            int recheck = ctl.get();            if (! isRunning(recheck) remove(command))                // 移除成功,拒绝该非运行的任务                reject(command);            else if (workerCountOf(recheck) == 0)                // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。                // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务                addWorker(null, false);        }        // 如果队列满了或者是非运行的任务都拒绝执行        else if (!addWorker(command, false))            reject(command);    }

「java队列执行」java队列用法

java队列实现异步执行

在整个思路上要调整一下

1、会有很多线程给一个队列上添加任务

2、有一个或者多个线程逐个执行队列的任务

考虑一下几点:

1、没有任务时,队列执行线程处于等待状态

2、添加任务时,激活队列执行线程,全部run起来,首先抢到任务的执行,其他全部wait

给个小例子吧

package org;

import java.util.LinkedList;

import java.util.List;

public class Queues {

public static ListTask queue = new LinkedListTask();

/**

 * 假如 参数o 为任务

 * @param o

 */

public static void add (Task t){

synchronized (Queues.queue) {

Queues.queue.add(t); //添加任务

Queues.queue.notifyAll();//激活该队列对应的执行线程,全部Run起来

}

}

static class Task{

public void test(){

System.out.println("我被执行了");

}

}

}

package org;

import java.util.List;

public class Exec implements Runnable{

@Override

public void run() {

while(true){

synchronized (Queues.queue) {

while(Queues.queue.isEmpty()){ //

try {

Queues.queue.wait(); //队列为空时,使线程处于等待状态

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("wait...");

}

Queues.Task t= Queues.queue.remove(0); //得到第一个

t.test(); //执行该任务

System.out.println("end");

}

}

}

public static void main(String[] args) {

Exec e = new Exec();

for (int i = 0; i  2; i++) {

new Thread(e).start(); //开始执行时,队列为空,处于等待状态

}

//上面开启两个线程执行队列中的任务,那就是先到先得了

//添加一个任务测试

Queues.Task t =new Queues.Task();

Queues.add(t); //执行该方法,激活所有对应队列,那两个线程就会开始执行啦

}

}

上面的就是很简单的例子了

java队列执行的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于java队列用法、java队列执行的信息别忘了在本站进行查找喔。

The End

发布于:2022-11-22,除非注明,否则均为首码项目网原创文章,转载请注明出处。