「任务分发java」任务分发框架
今天给各位分享任务分发java的知识,其中也会对任务分发框架进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:
- 1、求教Java Web高手一个简单的“Action概念”是什么?
- 2、分布式定时任务调度框架实践
- 3、「okhttp3 4.9.3 版本简单解析」
- 4、universal imageloader 怎么修改源码
- 5、如何将java类对象作为mapreduce中map函数的输入?
求教Java Web高手一个简单的“Action概念”是什么?
action在程序中扮演一个任务处理分发的角色,前台页面在需要请求后台时,需要说明要连接到后台的哪个接口,这个过程就是通过action来完成的。网上有很多搭建一个简单的增删改查的项目例子,跟着做一遍你会有更大的收获的。祝你学有所成。
分布式定时任务调度框架实践
分布式任务调度框架几乎是每个大型应用必备的工具,本文介绍了任务调度框架使用的需求背景和痛点,对业界普遍使用的开源分布式任务调度框架的使用进行了探究实践,并分析了这几种框架的优劣势和对自身业务的思考。
一、业务背景
1.1 为什么需要使用定时任务调度
(1)时间驱动处理场景: 整点发送优惠券,每天更新收益,每天刷新标签数据和人群数据。
(2)批量处理数据: 按月批量统计报表数据,批量更新短信状态,实时性要求不高。
(3)异步执行解耦: 活动状态刷新,异步执行离线查询,与内部逻辑解耦。
1.2 使用需求和痛点
(1)任务执行监控告警能力。
(2)任务可灵活动态配置,无需重启。
(3)业务透明,低耦合,配置精简,开发方便。
(4)易测试。
(5)高可用,无单点故障。
(6)任务不可重复执行,防止逻辑异常。
(7)大任务的分发并行处理能力。
二、开源框架实践与 探索
2.1 Java 原生 Timer 和
ScheduledExecutorService
2.1.1 Timer使用
Timer缺陷:
由于上述缺陷,尽量不要使用Timer, idea中也会明确提示,使用ScheduledThreadPoolExecutor替代Timer 。
2.1.2 ScheduledExecutorService使用
ScheduledExecutorService对于Timer的缺陷进行了修补,首先ScheduledExecutorService内部实现是ScheduledThreadPool线程池,可以支持多个任务并发执行。
对于某一个线程执行的任务出现异常,也会处理,不会影响其他线程任务的执行,另外ScheduledExecutorService是基于时间间隔的延迟,执行不会由于系统时间的改变发生变化。
当然,ScheduledExecutorService也有自己的局限性:只能根据任务的延迟来进行调度,无法满足基于绝对时间和日历调度的需求。
2.2 Spring Task
2.2.1 Spring Task 使用
spring task 是spring自主开发的轻量级定时任务框架,不需要依赖其他额外的包,配置较为简单。
此处使用注解配置
2.2.2 Spring Task缺陷
Spring Task 本身不支持持久化,也没有推出官方的分布式集群模式,只能靠开发者在业务应用中自己手动扩展实现,无法满足可视化,易配置的需求。
2.3 永远经典的 Quartz
2.3.1 基本介绍
Quartz框架是Java领域最著名的开源任务调度工具,也是目前事实上的定时任务标准,几乎全部的开源定时任务框架都是基于Quartz核心调度构建而成。
2.3.2 原理解析
核心组件和架构
关键概念
(1) Scheduler :任务调度器,是执行任务调度的控制器。本质上是一个计划调度容器,注册了全部Trigger和对应的JobDetail, 使用线程池作为任务运行的基础组件,提高任务执行效率。
(2) Trigger :触发器,用于定义任务调度的时间规则,告诉任务调度器什么时候触发任务,其中CronTrigger是基于cron表达式构建的功能强大的触发器。
(3) Calendar :日历特定时间点的集合。一个trigger可以包含多个Calendar,可用于排除或包含某些时间点。
(4) JobDetail :是一个可执行的工作,用来描述Job实现类及其它相关的静态信息,如Job的名称、监听器等相关信息。
(5) Job :任务执行接口,只有一个execute方法,用于执行真正的业务逻辑。
(6) JobStore :任务存储方式,主要有RAMJobStore和JDBCJobStore,RAMJobStore是存储在JVM的内存中,有丢失和数量受限的风险,JDBCJobStore是将任务信息持久化到数据库中,支持集群。
2.3.3 实践说明
(1)关于Quartz的基本使用
(2)业务使用要满足动态修改和重启不丢失, 一般需要使用数据库进行保存。
(3)组件化
(4)扩展
2.3.4 缺陷和不足
(1)需要把任务信息持久化到业务数据表,和业务有耦合。
(2)调度逻辑和执行逻辑并存于同一个项目中,在机器性能固定的情况下,业务和调度之间不可避免地会相互影响。
(3)quartz集群模式下,是通过数据库独占锁来唯一获取任务,任务执行并没有实现完善的负载均衡机制。
2.4 轻量级神器 XXL-JOB
2.4.1 基本介绍
XXL-JOB是一个轻量级分布式任务调度平台,主打特点是平台化,易部署,开发迅速、学习简单、轻量级、易扩展,代码仍在持续更新中。
主要提供了任务的动态配置管理、任务监控和统计报表以及调度日志几大功能模块,支持多种运行模式和路由策略,可基于对应执行器机器集群数量进行简单分片数据处理。
2.4.2 原理解析
2.1.0版本前核心调度模块都是基于quartz框架,2.1.0版本开始自研调度组件,移除quartz依赖 ,使用时间轮调度。
2.4.3 实践说明
详细配置和介绍参考官方文档。
2.4.3.1 demo使用:
@JobHandler(value="offlineTaskJobHandler") ,实现业务逻辑即可。(注:此次引入了dubbo,后文介绍)。
(滑动可查看)
示例2:分片广播任务。
(滑动可查看)
2.4.3.2 整合dubbo
(1)引入dubbo-spring-boot-starter和业务facade jar包依赖。
(滑动可查看)
(2)配置文件加入dubbo消费端配置(可根据环境定义多个配置文件,通过profile切换)。
(滑动可查看)
(3)代码中通过@Reference注入facade接口即可。
(滑动可查看)
(4)启动程序加入@EnableDubboConfiguration注解。
(滑动可查看)
2.4.4 任务可视化配置
内置了平台项目,方便了开发者对任务的管理和执行日志的监控,并提供了一些便于测试的功能。
2.4.5 扩展
(1)任务监控和报表的优化。
(2)任务报警方式的扩展,比如加入告警中心,提供内部消息,短信告警。
(3)对实际业务内部执行出现异常情况下的不同监控告警和重试策略。
2.5 高可用 Elastic-Job
2.5.1 基本介绍
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。
可惜的是已经两年没有迭代更新记录。
2.5.2 原理解析
2.5.3 实践说明
2.5.3.1 demo使用
(1)安装zookeeper,配置注册中心config,配置文件加入注册中心zk的配置。
(滑动可查看)
(滑动可查看)
(2)配置数据源config,并配置文件中加入数据源配置。
(滑动可查看)
(滑动可查看)
(3)配置事件config。
(滑动可查看)
(4)为了便于灵活配置不同的任务触发事件,加入ElasticSimpleJob注解。
(滑动可查看)
(5)对配置进行初始化。
(滑动可查看)
(6)实现 SimpleJob接口,按上文中方法整合dubbo, 完成业务逻辑。
(滑动可查看)
2.6 其余开源框架
(1) Saturn :Saturn是唯品会开源的一个分布式任务调度平台,在Elastic Job的基础上进行了改造。
(2) SIA-TASK :是宜信开源的分布式任务调度平台。
三、优劣势对比和业务场景适配思考
业务思考:
四、结语
对于并发场景不是特别高的系统来说,xxl-job配置部署简单易用,不需要引入多余的组件,同时提供了可视化的控制台,使用起来非常友好,是一个比较好的选择。希望直接利用开源分布式框架能力的系统,建议根据自身的情况来进行合适的选型。
附:参考文献
高可用架构
改变互联网的构建方式
「okhttp3 4.9.3 版本简单解析」
关于okhttp3的解析网上已经有非常多优秀的博文了,每每看完都觉得醍醐灌顶,豁然开朗。但等不了几天再回头看,还是跟当初一样陌生,究其根本原因,我们不过是在享受着别人的成果跟着别人的思路云阅读源码了一遍。okhttp从早期的Java版本到Kotlin版本一直不断优化升级,实现细节上也作出了调整。重读源码加上自身的思考能深刻地理解okhttp的实现原理。
从execute()开始,发现其实是一个接口中的方法(Call),这个很好理解根据官方的解释,Call其实是一个待执行的请求,并且这个请求所要的参数已经被准备好;当然既然是请求,那么它是可以被取消的。其次代表单个请求与响应流,因此不能够被再次执行。
Call接口具体的代码实现,重点关注同步执行方法execute()与异步请求enqueue():
Call作为接口,那么具体的实现细节则需要看它的实现类,RealCall作为Call的实现类,先找到对execute()的重写。
代码很少,逐步分析,首先对同步请求进行了检查-判断请求是否已经被执行过了;而这里使用的是并发包下的原子类CAS乐观锁,这里使用CAS比较算法目的也是为提升效率。其次是超时时间的判断,这个比较简单。在看callStart()的具体实现。上代码:
看名称猜测应该是事件监听之类的,可能是包括一些信息的记录与打印。回到RealCall类中,看看这个eventListener的作用到底是什么:
internal val eventListener: EventListener = client.eventListenerFactory.create(this)
继续向下可以知道这个EventListener是一个抽象类,而项目中其唯一实现类为LoggingEventListener,猜测还是有依据的,继续往下看:
实现类LoggingEventListener中对此方法的具体实现:
总结这个callStart()的作用,当同步请求或者异步请求被加到队列时,callStart()会被立即执行(在没有达到线程限制的情况下)记录请求开始的时间与请求的一些信息。如下:
代卖第四段#4 client.dispatcher.executed(this),看样子是在这里开启执行的,可实际真是如此嘛?回到OkHttpClient类中,看看这个分发器dispatcher到底是什么。
具体实现类Dispatcher代码(保留重要代码):
根据注释的信息可以知道,Dispatcher是处理异步请求的执行的策略,当然开发可以实现自己的策略。
知道了Dispatcher的作用,再回到client.dispatcher.executed(this),也即:
结合execute()与Dispatcher分析
到这里请求其实还没有真正的执行,只是在做一些前期的工作,回到Call接口中看看对方法同步请求方法execute()的说明:同步请求可以立即执行,阻塞直到返回正确的结果,或者报错结束。
到#4步执行后,return getResponseWithInterceptorChain() //#5这个方法才是请求一步步推进的核心。也是okhttp网络请求责任链的核心模块。
分析getResponseWithInterceptorChain()方法之前有必要看看OkHttpClient的构造参数,使用的Builder模式,参数很多,可配置化的东西很多,精简一下主要关注几个参数:
到这里有个疑问,这个添加自定义拦截器与添加自定义网络拦截器有什么区别呢?方法看上去是差不多的,查看官方的说明可以发现一些细节。文档中解释了Application Interceptor与Network Interceptors的细微差别。先回到RealCall中查看getResponseWithInterceptorChain()是如何对拦截器结合组装的:
看#1与#2分别对应添加的自定义拦截器与自定义网络拦截器的位置,自定义拦截器是拦截器链的链头,而自定义网络拦截器在ConnectInterceptor拦截器与CallServerInterceptor拦截器之间被添加。总结一下:
Don’t need to worry about intermediate responses like redirects and retries.
Are always invoked once, even if the HTTP response is served from the cache.
Observe the application’s original intent. Unconcerned with OkHttp-injected headers like If-None-Match.
Permitted to short-circuit and not call Chain.proceed().
Permitted to retry and make multiple calls to Chain.proceed().
Can adjust Call timeouts using withConnectTimeout, withReadTimeout, withWriteTimeout.
Able to operate on intermediate responses like redirects and retries.
Not invoked for cached responses that short-circuit the network.
Observe the data just as it will be transmitted over the network.
Access to the Connection that carries the request.
综上可以得出整个链的顺序结构,如果都包含自定义拦截器与自定义网络拦截器,则为自定义拦截器-RetryAndFollowUpInterceptor-BridgeInterceptor-CacheInterceptor-ConnectInterceptor-自定义网络拦截器-CallServerInterceptor;那么链是如何按照顺序依次执行的呢?okhttp在这里设计比较精妙,在构造RealInterceptorChain对象时带入index信息,这个index记录的就是单个拦截器链的位置信息,而RealInterceptorChain.proceed(request: Request)通过index++自增一步步执行责任链一直到链尾。
简单的分析推进过程:
1.RealInterceptorChain的构造参数中携带了index的信息,index++自增通过proceed方法不断执行。
2.拦截器统一实现Interceptor接口,接口中fun proceed(request: Request): Response保证了链式链接。当然拦截器的顺序是按照一定的规则排列的,逐个分析。
1.重试拦截器规定默认的重试次数为20次
2.以response = realChain.proceed(request)为分界点,包括其他的拦截器,在责任链传递之前所做的工作都是前序工作,然后将request下发到下一个拦截器。
3.response = realChain.proceed(request)后的代码逻辑为后续工作,即拿到上个拦截器的response结果,有点递归的意思,按照责任链的执行一直到最后一个拦截器获得的结果依次上抛每个拦截器处理这个response完成一些后序工作。
4.当然并不是每个请求都会走完整个链,如CacheInterceptor当开启了缓存(存在缓存)拿到了缓存的response那么之后的拦截器就不会在继续传递。
1.桥接拦截器主要对请求的Hader的信息的补充,包括内容长度等。
2.传递请求到下一个链,等待返回的response信息。
3.后序的操作包括Cookie、gzip压缩信息,User-Agent等信息的补充。
1.缓存拦截器默认没有被开启,需要在调用时指定缓存的目录,内部基于DiskLruCache实现了磁盘缓存。
2.当缓存开启,且命中缓存,那么链的调用不会再继续向下传递(此时已经拿到了response)直接进行后序的操作。
3.如果未命中,则会继续传递到下一个链也即是ConnectInterceptor。
1.建立与目标的服务器的TCP或者TCP-TLS的链接。
2.与之前的拦截器不同,前面的拦截器的前序操作基于调用方法realChain.proceed()之前,但是ConnectInterceptor 没有后序操作,下发到下一个拦截器 。
1.实质上是请求与I/O操作,将请求的数据写入到Socket中。
2.从Socket读取响应的数据 TCP/TCP-TLS对应的端口 ,对于I/O操作基于的是okio,而okhttp的高效请求同样离不开okio的支持。
3.拿到数据reponse返回到之前包含有后序操作的拦截器,但ConnectInterceptor除外,ConnectInterceptor是没有后续操作的。
整个拦截器流程图如下:
1.排除极端的情况,System.exit()或者其他, finally 块必然执行,不论发生异常与否,也不论在 finally 之前是否有return。
2.不管在 try 块中是否包含 return, finally 块总是在 return 之前执行。
3.如果 finally 块中有 return ,那么 try 块和 catch 块中的 return 就没有执行机会了。
Tip:第二条的结论很重要,回到execute()方法,dispatcher.finished(this)在结果response结果返回之前执行,看finished()具体实现。
1.#1方法一,calls.remove(call)返回为 true ,也即是这个同步请求被从runningSyncCalls中移除释放;所以idleCallback为空。
2.#3很显然asyncCall的结果为空,没有异步请求,在看#4具体实现,runningSyncCalls的size为1。则isRunning的结果为 true 。idleCallback.run()不会被执行,并且idleCallback其实也是为空。
1.从 OkHttpClient().newCall(request).execute() 开启同步请求任务。
2.得到的 RealCall 对象作为 Call 的唯一实现类,其中同步方法 execute() 是阻塞的,调用到会立即执行 阻塞 到有结果返回,或者发生错误 error 被打断阻塞。
3. RealCall 中同步 execute() 请求方法被执行,而此时 OkHttpClient 实例中的异步任务分发器 Dispatcher 会将请求的实例 RealCall 添加到双端队列 runningSyncCalls 中去。
4.通过 RealCall 中的方法 getResponseWithInterceptorChain() 开启请求拦截器的责任链,将请求逐一下发,通过持有 index 并自增操作,其次除 ConnectInterceptor 与链尾 CallServerInterceptor 其余默认拦截器均有以 chain.proceed(request) 为分界点的前序与后序操作,拿到 response 后依次处理后序操作。
5.最终返回结果 response 之前,对进行中的同步任务做了移除队列的操作也即 finally 中 client.dispatcher.finished(this) 方法,最终得到的结果 response 返回到客户端,至此整个 同步请求 流程就结束了。
Github
Square
universal imageloader 怎么修改源码
Android Universal Image Loader 源码分析
本文为 Android 开源项目源码解析 中 Android Universal Image Loader 部分
项目地址:Android-Universal-Image-Loader,分析的版本:eb794c3,Demo 地址:UIL Demo
分析者:huxian99,校对者:Grumoon、Trinea,校对状态:完成
1. 功能介绍
1.1 Android Universal Image Loader
Android Universal Image Loader 是一个强大的、可高度定制的图片缓存,本文简称为UIL。
简单的说 UIL 就做了一件事——获取图片并显示在相应的控件上。
1.2 基本使用
1.2.1 初始化
添加完依赖后在Application或Activity中初始化ImageLoader,如下:
public class YourApplication extends Application {
@Override
public void onCreate() {
super.onCreate();
ImageLoaderConfiguration configuration = new ImageLoaderConfiguration.Builder(this)
// 添加你的配置需求
.build();
ImageLoader.getInstance().init(configuration);
}
}
其中 configuration 表示ImageLoader的配置信息,可包括图片最大尺寸、线程池、缓存、下载器、解码器等等。
1.2.2 Manifest 配置
manifest
uses-permission android:name="android.permission.INTERNET" /
uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /
application
android:name=".YourApplication"
……
……
/application
/manifest
添加网络权限。如果允许磁盘缓存,需要添加写外设的权限。
1.2.3 下载显示图片
下载图片,解析为 Bitmap 并在 ImageView 中显示。
imageLoader.displayImage(imageUri, imageView);
下载图片,解析为 Bitmap 传递给回调接口。
imageLoader.loadImage(imageUri, new SimpleImageLoadingListener() {
@Override
public void onLoadingComplete(String imageUri, View view, Bitmap loadedImage) {
// 图片处理
}
});
以上是简单使用,更复杂 API 见本文详细设计。
1.3 特点
可配置度高。支持任务线程池、下载器、解码器、内存及磁盘缓存、显示选项等等的配置。
包含内存缓存和磁盘缓存两级缓存。
支持多线程,支持异步和同步加载。
支持多种缓存算法、下载进度监听、ListView 图片错乱解决等。
2. 总体设计
2.1. 总体设计图
上面是 UIL 的总体设计图。整个库分为ImageLoaderEngine,Cache及ImageDownloader,ImageDecoder,BitmapDisplayer,BitmapProcessor五大模块,其中Cache分为MemoryCache和DiskCache两部分。
简单的讲就是ImageLoader收到加载及显示图片的任务,并将它交给ImageLoaderEngine,ImageLoaderEngine分发任务到具体线程池去执行,任务通过Cache及ImageDownloader获取图片,中间可能经过BitmapProcessor和ImageDecoder处理,最终转换为Bitmap交给BitmapDisplayer在ImageAware中显示。
2.2. UIL 中的概念
简单介绍一些概念,在4. 详细设计中会仔细介绍。
ImageLoaderEngine:任务分发器,负责分发LoadAndDisplayImageTask和ProcessAndDisplayImageTask给具体的线程池去执行,本文中也称其为engine,具体参考4.2.6 ImageLoaderEngine.java。
ImageAware:显示图片的对象,可以是ImageView等,具体参考4.2.9 ImageAware.java。
ImageDownloader:图片下载器,负责从图片的各个来源获取输入流, 具体参考4.2.22 ImageDownloader.java。
Cache:图片缓存,分为MemoryCache和DiskCache两部分。
MemoryCache:内存图片缓存,可向内存缓存缓存图片或从内存缓存读取图片,具体参考4.2.24 MemoryCache.java。
DiskCache:本地图片缓存,可向本地磁盘缓存保存图片或从本地磁盘读取图片,具体参考4.2.38 DiskCache.java。
ImageDecoder:图片解码器,负责将图片输入流InputStream转换为Bitmap对象, 具体参考4.2.53 ImageDecoder.java。
BitmapProcessor:图片处理器,负责从缓存读取或写入前对图片进行处理。具体参考4.2.61 BitmapProcessor.java。
BitmapDisplayer:将Bitmap对象显示在相应的控件ImageAware上, 具体参考4.2.56 BitmapDisplayer.java。
LoadAndDisplayImageTask:用于加载并显示图片的任务, 具体参考4.2.20 LoadAndDisplayImageTask.java。
ProcessAndDisplayImageTask:用于处理并显示图片的任务, 具体参考4.2.19 ProcessAndDisplayImageTask.java。
DisplayBitmapTask:用于显示图片的任务, 具体参考4.2.18 DisplayBitmapTask.java。
3. 流程图
上图为图片加载及显示流程图,在uil库中给出,这里用中文重新画出。
4. 详细设计
4.1 类关系图
4.2 核心类功能介绍
4.2.1 ImageLoader.java
图片加载器,对外的主要 API,采取了单例模式,用于图片的加载和显示。
主要函数:
(1). getInstance()
得到ImageLoader的单例。通过双层是否为 null 判断提高性能。
(2). init(ImageLoaderConfiguration configuration)
初始化配置参数,参数configuration为ImageLoader的配置信息,包括图片最大尺寸、任务线程池、磁盘缓存、下载器、解码器等等。
实现中会初始化ImageLoaderEngine engine属性,该属性为任务分发器。
(3). displayImage(String uri, ImageAware imageAware, DisplayImageOptions options, ImageLoadingListener listener, ImageLoadingProgressListener progressListener)
加载并显示图片或加载并执行回调接口。ImageLoader 加载图片主要分为三类接口:
displayImage(…) 表示异步加载并显示图片到对应的ImageAware上。
loadImage(…) 表示异步加载图片并执行回调接口。
loadImageSync(…) 表示同步加载图片。
以上三类接口最终都会调用到这个函数进行图片加载。函数参数解释如下:
uri: 图片的 uri。uri 支持多种来源的图片,包括 http、https、file、content、assets、drawable 及自定义,具体介绍可见ImageDownloader。
imageAware: 一个接口,表示需要加载图片的对象,可包装 View。
options: 图片显示的配置项。比如加载前、加载中、加载失败应该显示的占位图片,图片是否需要在磁盘缓存,是否需要在内存缓存等。
listener: 图片加载各种时刻的回调接口,包括开始加载、加载失败、加载成功、取消加载四个时刻的回调函数。
progressListener: 图片加载进度的回调接口。
函数流程图如下:
4.2.2 ImageLoaderConfiguration.java
ImageLoader的配置信息,包括图片最大尺寸、线程池、缓存、下载器、解码器等等。
主要属性:
(1). Resources resources
程序本地资源访问器,用于加载DisplayImageOptions中设置的一些 App 中图片资源。
(2). int maxImageWidthForMemoryCache
内存缓存的图片最大宽度。
(3). int maxImageHeightForMemoryCache
内存缓存的图片最大高度。
(4). int maxImageWidthForDiskCache
磁盘缓存的图片最大宽度。
(5). int maxImageHeightForDiskCache
磁盘缓存的图片最大高度。
(6). BitmapProcessor processorForDiskCache
图片处理器,用于处理从磁盘缓存中读取到的图片。
(7). Executor taskExecutor
ImageLoaderEngine中用于执行从源获取图片任务的 Executor。
(18). Executor taskExecutorForCachedImages
ImageLoaderEngine中用于执行从缓存获取图片任务的 Executor。
(19). boolean customExecutor
用户是否自定义了上面的 taskExecutor。
(20). boolean customExecutorForCachedImages
用户是否自定义了上面的 taskExecutorForCachedImages。
(21). int threadPoolSize
上面两个默认线程池的核心池大小,即最大并发数。
(22). int threadPriority
上面两个默认线程池的线程优先级。
(23). QueueProcessingType tasksProcessingType
上面两个默认线程池的线程队列类型。目前只有 FIFO, LIFO 两种可供选择。
(24). MemoryCache memoryCache
图片内存缓存。
(25). DiskCache diskCache
图片磁盘缓存,一般放在 SD 卡。
(26). ImageDownloader downloader
图片下载器。
(27). ImageDecoder decoder
图片解码器,内部可使用我们常用的BitmapFactory.decode(…)将图片资源解码成Bitmap对象。
(28). DisplayImageOptions defaultDisplayImageOptions
图片显示的配置项。比如加载前、加载中、加载失败应该显示的占位图片,图片是否需要在磁盘缓存,是否需要在内存缓存等。
(29). ImageDownloader networkDeniedDownloader
不允许访问网络的图片下载器。
(30). ImageDownloader slowNetworkDownloader
慢网络情况下的图片下载器。
4.2.3 ImageLoaderConfiguration.Builder.java 静态内部类
Builder 模式,用于构造参数繁多的ImageLoaderConfiguration。
其属性与ImageLoaderConfiguration类似,函数多是属性设置函数。
主要函数及含义:
(1). build()
按照配置,生成 ImageLoaderConfiguration。代码如下:
public ImageLoaderConfiguration build() {
initEmptyFieldsWithDefaultValues();
return new ImageLoaderConfiguration(this);
}
(2). initEmptyFieldsWithDefaultValues()
初始化值为null的属性。若用户没有配置相关项,UIL 会通过调用DefaultConfigurationFactory中的函数返回一个默认值当配置。
taskExecutorForCachedImages、taskExecutor及ImageLoaderEngine的taskDistributor的默认值如下:
parameters
taskDistributor
taskExecutorForCachedImages/taskExecutor
corePoolSize 0 3
maximumPoolSize Integer.MAX_VALUE 3
keepAliveTime 60 0
unit SECONDS MILLISECONDS
workQueue SynchronousQueue LIFOLinkedBlockingDeque / LinkedBlockingQueue
priority 5 3
diskCacheFileNameGenerator默认值为HashCodeFileNameGenerator。
memoryCache默认值为LruMemoryCache。如果内存缓存不允许缓存一张图片的多个尺寸,则用FuzzyKeyMemoryCache做封装,同一个图片新的尺寸会覆盖缓存中该图片老的尺寸。
diskCache默认值与diskCacheSize和diskCacheFileCount值有关,如果他们有一个大于 0,则默认为LruDiskCache,否则使用无大小限制的UnlimitedDiskCache。
downloader默认值为BaseImageDownloader。
decoder默认值为BaseImageDecoder。
详细及其他属性默认值请到DefaultConfigurationFactory中查看。
(3). denyCacheImageMultipleSizesInMemory()
设置内存缓存不允许缓存一张图片的多个尺寸,默认允许。
后面会讲到 View 的 getWidth() 在初始化前后的不同值与这个设置的关系。
(4). diskCacheSize(int maxCacheSize)
设置磁盘缓存的最大字节数,如果大于 0 或者下面的maxFileCount大于 0,默认的DiskCache会用LruDiskCache,否则使用无大小限制的UnlimitedDiskCache。
(5). diskCacheFileCount(int maxFileCount)
设置磁盘缓存文件夹下最大文件数,如果大于 0 或者上面的maxCacheSize大于 0,默认的DiskCache会用LruDiskCache,否则使用无大小限制的UnlimitedDiskCache。
4.2.4 ImageLoaderConfiguration.NetworkDeniedImageDownloader.java 静态内部类
不允许访问网络的图片下载器,实现了ImageDownloader接口。
实现也比较简单,包装一个ImageDownloader对象,通过在 getStream(…) 函数中禁止 Http 和 Https Scheme 禁止网络访问,如下:
@Override
public InputStream getStream(String imageUri, Object extra) throws IOException {
switch (Scheme.ofUri(imageUri)) {
case HTTP:
case HTTPS:
throw new IllegalStateException();
default:
return wrappedDownloader.getStream(imageUri, extra);
}
}
4.2.5 ImageLoaderConfiguration.SlowNetworkImageDownloader.java 静态内部类
慢网络情况下的图片下载器,实现了ImageDownloader接口。
通过包装一个ImageDownloader对象实现,在 getStream(…) 函数中当 Scheme 为 Http 和 Https 时,用FlushedInputStream代替InputStream处理慢网络情况,具体见后面FlushedInputStream的介绍。
4.2.6 ImageLoaderEngine.java
LoadAndDisplayImageTask和ProcessAndDisplayImageTask任务分发器,负责分发任务给具体的线程池。
主要属性:
(1). ImageLoaderConfiguration configuration
ImageLoader的配置信息,可包括图片最大尺寸、线程池、缓存、下载器、解码器等等。
(2). Executor taskExecutor
用于执行从源获取图片任务的 Executor,为configuration中的 taskExecutor,如果为null,则会调用DefaultConfigurationFactory.createExecutor(…)根据配置返回一个默认的线程池。
(3). Executor taskExecutorForCachedImages
用于执行从缓存获取图片任务的 Executor,为configuration中的 taskExecutorForCachedImages,如果为null,则会调用DefaultConfigurationFactory.createExecutor(…)根据配置返回一个默认的线程池。
(4). Executor taskDistributor
任务分发线程池,任务指LoadAndDisplayImageTask和ProcessAndDisplayImageTask,因为只需要分发给上面的两个 Executor 去执行任务,不存在较耗时或阻塞操作,所以用无并发数(Int 最大值)限制的线程池即可。
(5). Map cacheKeysForImageAwares
ImageAware与内存缓存 key 对应的 map,key 为ImageAware的 id,value 为内存缓存的 key。
(6). Map uriLocks
图片正在加载的重入锁 map,key 为图片的 uri,value 为标识其正在加载的重入锁。
(7). AtomicBoolean paused
是否被暂停。如果为true,则所有新的加载或显示任务都会等待直到取消暂停(为false)。
(8). AtomicBoolean networkDenied
是否不允许访问网络,如果为true,通过ImageLoadingListener.onLoadingFailed(…)获取图片,则所有不在缓存中需要网络访问的请求都会失败,返回失败原因为网络访问被禁止。
(9). AtomicBoolean slowNetwork
是否是慢网络情况,如果为true,则自动调用SlowNetworkImageDownloader下载图片。
(10). Object pauseLock
如何将java类对象作为mapreduce中map函数的输入?
1.首先介绍一下wordcount 早mapreduce框架中的 对应关系
大家都知道 mapreduce 分为 map 和reduce 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 reduce;
大家都明白 map接受一个参数,经过map处理后,将处理结果作为reduce的入参分发给reduce,然后在reduce中统计了word 的数量,最终输出到输出结果;
但是初看遇到的问题:
一、map的输入参数是个 Text之类的 对象,并不是 file对象
二、reduce中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在reduce的时候才判断的
三、map过程到底做了什么,reduce过程到底做了什么?为什么它能够做到多个map多个reduce?
一、
1. 怎么将 文件参数 传递 到 job中呢?
在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。
我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了
ListInputSplitgetSplits(JobContext job)
Generate the list of files and make them into FileSplits.
具体实现参考 FileInputFormat getSplits 方法:
上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize=blockSize=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:mapreduce.input.num.files。
二、计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?
我们使用了 就是InputFormat中的另一个方法createRecordReader() 这个方法:
RecordReader:
RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。
可以看到接口中有:
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
public abstract float getProgress() throws IOException, InterruptedException;
public abstract void close() throws IOException;
FileInputFormatK,V
Direct Known Subclasses:
CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat
对于 wordcount 测试用了 NLineInputFormat和 TextInputFormat 实现类
在 InputFormat 构建一个 RecordReader 出来,然后调用RecordReader initialize 的方法,初始化RecordReader 对象
那么 到底 Map是怎么调用 的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了RecordReader中,
下面继续看看这些RecordReader是如何被MapReduce框架使用的
终于 说道 Map了 ,我么如果要实现Map 那么 一定要继承 Mapper这个类
public abstract class Context
implements MapContextKEYIN,VALUEIN,KEYOUT,VALUEOUT {
}
protected void setup(Context context) throws IOException, InterruptedException
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }
protected void cleanup(Context context ) throws IOException, InterruptedException { }
public void run(Context context) throws IOException, InterruptedException { }
我们写MapReduce程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);
最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。
我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。
public MapContextImpl(Configuration conf, TaskAttemptID taskid,
RecordReaderKEYIN,VALUEIN reader,
RecordWriterKEYOUT,VALUEOUT writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
RecordReader 看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 RecordReader 传过来了 ?
我们可以想象 这里 应该被框架调用的可能性比较大了,那么mapreduce 框架是怎么分别来调用map和reduce呢?
还以为分析完map就完事了,才发现这里仅仅是做了mapreduce 框架调用前的一些准备工作,
还是继续分析 下 mapreduce 框架调用吧:
1.在 job提交 任务之后 首先由jobtrack 分发任务,
在 任务分发完成之后 ,执行 task的时候,这时 调用了 maptask 中的 runNewMapper
在这个方法中调用了 MapContextImpl, 至此 这个map 和框架就可以联系起来了。
关于任务分发java和任务分发框架的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。