「JAVA任务流」Java任务流程怎么设计

博主:adminadmin 2022-12-15 14:54:06 65

本篇文章给大家谈谈JAVA任务流,以及Java任务流程怎么设计对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

调度工具(ETL+任务流)

kettle是一个ETL工具,ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程)。

kettle中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。

所以他的重心是用于数据

oozie是一个工作流,Oozie工作流是放置在控制依赖DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop的Map/Reduce作业、Pig作业等),其中指定了动作执行的顺序。

oozie工作流中是有数据流动的,但是重心是在于工作流的定义。

二者虽然都有相关功能及数据的流动,但是其实用途是不一样的。

查看帮助

列举出所有linux上的数据库

列举出所有Window上的数据库

查看数据库下的所有表

(1)确定mysql服务启动正常

查询控制端口和查询进程来确定,一下两种办法可以确认mysql是否在启动状态

办法1:查询端口

MySQL监控的TCP的3306端口,如果显示3306,证明MySQL服务在运行中

办法二:查询进程

可以看见mysql的进程

没有指定数据导入到哪个目录,默认是/user/root/表名

原因:

如果表中有主键,m的值可以设置大于1的值;如果没有主键只能将m值设置成为1;或者要将m值大于1,需要使用--split-by指定一个字段

设置了-m 1 说明只有一个maptask执行数据导入,默认是4个maptask执行导入操作,但是必须指定一个列来作为划分依据

导入数据到指定目录

在导入表数据到HDFS使用Sqoop导入工具,我们可以指定目标目录。使用参数 --target-dir来指定导出目的地,使用参数—delete-target-dir来判断导出目录是否存在,如果存在就删掉

查询导入

提示:must contain '$CONDITIONS' in WHERE clause。

where id =1 匹配条件

$CONDITIONS:传递作用。

如果 query 后使用的是双引号,则 $CONDITIONS前必须加转义符,防止 shell 识别为自己的变量。

--query时不能使用--table一起使用

需要指定--target-dir路径

导入到hdfs指定目录并指定要求

数据导出储存方式(数据存储文件格式---( textfil parquet)--as-textfileImports data as plain text (default)--as-parquetfile Imports data to Parquet Files)

导入表数据子集到HDFS

sqoop导入blob数据到hive

对于CLOB,如xml文本,sqoop可以迁移到Hive表,对应字段存储为字符类型。

对于BLOB,如jpg图片,sqoop无法直接迁移到Hive表,只能先迁移到HDFS路径,然后再使用Hive命令加载到Hive表。迁移到HDFS后BLOB字段存储为16进制形式。

2.1.3导入关系表到Hive

第一步:导入需要的jar包

将我们mysql表当中的数据直接导入到hive表中的话,我们需要将hive的一个叫做hive-exec-1.1.0-cdh5.14.0.jar的jar包拷贝到sqoop的lib目录下

第二步:开始导入

导入关系表到hive并自动创建hive表

们也可以通过命令来将我们的mysql的表直接导入到hive表当中去

通过这个命令,我们可以直接将我们mysql表当中的数据以及表结构一起倒入到hive当中去

--incremental 增量模式。

append id 是获取一个某一列的某个值。

lastmodified “2016-12-15 15:47:35” 获取某个时间后修改的所有数据

-append 附加模式

-merge-key id 合并模式

--check-column 用来指定一些列,可以去指定多个列;通常的是指定主键id

--last -value 从哪个值开始增量

==注意:增量导入的时候,一定不能加参数--delete-target-dir 否则会报错==

第一种增量导入方式(不常用)

1.Append方式

使用场景:有个订单表,里面每个订单有一个唯一标识的自增列id,在关系型数据库中以主键的形式存在。之前已经将id在0-1000之间的编号的订单导入到HDFS 中;如果在产生新的订单,此时我们只需指定incremental参数为append,--last-value参数为1000即可,表示只从id大于1000后开始导入。

(1)创建一个MySQL表

(2)创建一个hive表(表结构与mysql一致)

注意:

append 模式不支持写入到hive表中

2.lastModify方式

此方式要求原有表有time字段,它能指定一个时间戳,让sqoop把该时间戳之后的数据导入到HDFS;因为后续订单可能状体会变化,变化后time字段时间戳也会变化,此时sqoop依然会将相同状态更改后的订单导入HDFS,当然我们可以只当merge-key参数为order-id,表示将后续新的记录和原有记录合并。

# 将时间列大于等于阈值的数据增量导入HDFS

使用 lastmodified 方式导入数据,要指定增量数据是要 --append(追加)还是要 --merge-key(合并)last-value 指定的值是会包含于增量导入的数据中。

第二种增量导入方式(推荐)

==通过where条件选取数据更加精准==

2.1.5从RDBMS到HBase

会报错

原因:sqoop1.4.6 只支持 HBase1.0.1 之前的版本的自动创建 HBase 表的功能。

解决方案:手动创建 HBase 表

导出前,目标表必须存在与目标数据库中

默认操作是将文件中的数据使用insert语句插入到表中

数据是在HDFS当中的如下目录/sqoop/emp,数据内容如下

第一步:创建MySQL表

第二步:执行导出命令

通过export来实现数据的导出,将hdfs的数据导出到mysql当中去

全量导出

增量导出

更新导出

总结:

参数介绍

--update-key 后面也可以接多个关键字列名,可以使用逗号隔开,Sqoop将会匹配多个关键字后再执行更新操作。

--export-dir 参数配合--table或者--call参数使用,指定了HDFS上需要将数据导入到MySQL中的文件集目录。

--update-mode updateonly和allowinsert。 默认模式为updateonly,如果指定--update-mode模式为allowinsert,可以将目标数据库中原来不存在的数据也导入到数据库表中。即将存在的数据更新,不存在数据插入。

组合测试及说明

1、当指定update-key,且关系型数据库表存在主键时:

A、allowinsert模式时,为更新目标数据库表存的内容,并且原来不存在的数据也导入到数据库表;

B、updateonly模式时,为更新目标数据库表存的内容,并且原来不存在的数据也不导入到数据库表;

2、当指定update-key,且关系型数据库表不存在主键时:

A、allowinsert模式时,为全部数据追加导入到数据库表;

B、updateonly模式时,为更新目标数据库表存的内容,并且原来不存在的数据也不导入到数据库表;

3、当不指定update-key,且关系型数据库表存在主键时:

A、allowinsert模式时,报主键冲突,数据无变化;

B、updateonly模式时,报主键冲突,数据无变化;

4、当不指定update-key,且关系型数据库表不存在主键时:

A、allowinsert模式时,为全部数据追加导入到数据库表;

B、updateonly模式时,为全部数据追加导入到数据库表;

实际案例:

(1)mysql批量导入hive

使用shell脚本:

笔者目前用sqoop把mysql数据导入到Hive中,最后实现命令行导入,sqoop版本1.4.7,实现如下

最后需要把这个导入搞成job,每天定时去跑,实现数据的自动化增量导入,sqoop支持job的管理,可以把导入创建成job重复去跑,并且它会在metastore中记录增值,每次执行增量导入之前去查询

创建job命令如下

创建完job就可以去执行它了

sqoop job --exec users

可以把该指令设为Linux定时任务,或者用Azkaban定时去执行它

hive导出到MySQL时,date类型数据发生变化?

问题原因:时区设置问题,date -R查看服务器时间,show VARIABLES LIKE "%time_zone"查看Mysql时间,system并不表示中国的标准时间,要将时间设置为东八区

(1):对市面上最流行的两种调度器,给出以下详细对比,以供技术选型参考。总体来说,ooize相比azkaban是一个重量级的任务调度系统,功能全面,但配置使用也更复杂。如果可以不在意某些功能的缺失,轻量级调度器azkaban是很不错的候选对象。

(2):功能:

两者均可以调度mapreduce,pig,java,脚本工作流任务;

两者均可以定时执行工作流任务;

(3):工作流定义:

Azkaban使用Properties文件定义工作流;

Oozie使用XML文件定义工作流;

(4):工作流传参:

Azkaban支持直接传参,例如${input};

Oozie支持参数和EL表达式,例如${fs:dirSize(myInputDir)};

(5):定时执行:

Azkaban的定时执行任务是基于时间的;

Oozie的定时执行任务基于时间和输入数据;

(6):资源管理:

Azkaban有较严格的权限控制,如用户对工作流进行读/写/执行等操作;

Oozie暂无严格的权限控制;

(7):工作流执行:

Azkaban有两种运行模式,分别是solo server mode(executor server和web server部署在同一台节点)和multi server mode(executor server和web server可以部署在不同节点);

Oozie作为工作流服务器运行,支持多用户和多工作流;

(8):工作流管理:

Azkaban支持浏览器以及ajax方式操作工作流;

Oozie支持命令行、HTTP REST、Java API、浏览器操作工作流;

浏览器页面访问

使用Oozie时通常整合hue,用户数据仓库调度

就是刚才选择的脚本

脚本里需要的参数,尽量设置为动态自动获取,如 ${date}

第一步的参数是所有文件和当天日期,后面的只需要日期,最后一步是导出所有结果,相应填入

添加文件和设置相应参数

运行后会有状态提示页面,可以看到任务进度

点击调度任务的页面情况

修改定时任务名和描述

添加需要定时调度的任务

sm-workflow的参数都是写死的,没有设置动态,这里的下拉列表就不会有可选项。

设置参数

将sm-workflow的日期修改为 ${do_date},保存

进入定时计划sm-dw中,会看到有参数 do_date

填入相应参数,前一天日期

Oozie常用系统常量

当然,也可以通过这样将参数传入workflow任务中,代码或者shell中需要的参数。

如,修改sm-workflow 中的 sqoop_import.sh,添加一个参数 ${num}。

编辑文件(需要登陆Hue的用户有对HDFS操作的权限),修改shell中的一个值为参数,保存。

在workflow中,编辑添加参数 ${num} ,或者num=${num} 保存。

进入schedule中,可以看到添加的参数,编辑输入相应参数即可。

Bundle统一管理所有定时调度,阶段划分:Bundle Schedule workflow

简述Java虚拟机(JVM)执行的三大任务

1、编译java文件为class文件

2、执行class文件中的代码

3、内存管理,垃圾回收

在JAVA中如何实现长时间任务

一、问题背景 在应用程序中我们经常需要一个类去完成像数据处理、监听事件或检查另一个类的活动等任务。为了达到这个目标,我们可能使用带有一套锁和消息通知的线程。JAVA 线程API已经很好的文档化,但为了使线程能够正确而高效地运行,程序员仍然需要丰富的编程经验并编写大量的代码。通过应用本篇文章中讨论的框架,程序员能够避免忍受煎熬写大量的代码,快速创建健壮的应用程序。 二、长时间运行任务的程序框架 Framework for long-running tasksThe primary thing about a long-lived task is that it should somehow be kept running during the application lifetime. The right way to accomplish this is to provide a thread of execution for a particular task. You create a task as a thread or as an implementation of the java.lang.Runnable interface. If you implement Runnable, you can gain better object-oriented design and avoid the single-inheritance problems. You can also more efficiently manipulate with Runnable instances, for example, using a thread pool that usually needs a Runnable instance, not a thread, to run. 关于长时间运行的任务的主要事情是如何在应用程序的生命期使它一直保持运行。实现的恰当方法是提供一个线程来执行这个特定的任务。我们可以通过继承Thread类或实现java.lang.Runnable接口来达到该目标。如果采用实现Runnable接口的方式,就可以能够获得更好的面向对象的设计,同时可以避免JAVA中的单继承问题。另外,我们也能更有效的处理Runnable实例(例如使用线程池通常需要一个Runnable实例而不是线程来运行)。 The essence of the framework is the abstract class Worker ( Listing A), which implements the Runnable interface and provides the helper methods for efficient task handling. Some of the methods are fully implemented, like the run() method, but some are abstract and have to be filled by you. If you want to create a long-running class, you need only to extend the Worker class and implement several abstract methods. Let’s look at these methods in more detail. 框架的基础是一个叫Worker的抽象类,它实现了Runnable接口,并提供了有效处理任务的好方法。这些方法有些已经被实现,如run()方法,但有些是抽象方法,开发人员必须自己来实现。如果要创建一个长时间运行的类,你只需要继承Worker类并实现几个抽象方法。让我们看看这些方法的细节。 The run() method of the Worker class is designed to continuously execute the work() method until it is stopped. The work() method can be responsible for data processing, reaction to some event, file reading or writing, SQL execution, etc. It can throw an exception, so it is a good practice to propagate it and let the run() method handle it. Worker 类的run()方法被设计成只要不停止运行就持续的执行work()方法。work()方法可以负责数据处理、事件响应、文件读写、,执行SQL命令等操作。这样work()方法能够抛出异常,并将异常传给run(),然后由run()方法来处理这些异常。 The run() method has two levels of try-catch clause: outside and inside the while-loop. The first try-catch clause is meant to catch all nonprogrammed exceptions and guarantee that the run() method never exits. The second clause will catch any kind of exceptions belonging to business logic and behave accordingly. If some waiting operation takes place in the work() method (e.g., waiting on an InputStream or a Socket), it is advisable to propagate an InterruptedException. The thing to keep in mind is that the work() method does not need to have any while-loop to keep it going as long as an application runs. The Worker does this for you. run()方法有内外两层try-catch语句:一层处于while-loop循环外,一层在while-loop循环内。前一个try-catch用于捕获非编程异常以确保run()方法不退出。后一个try-catch语句捕获关于业务逻辑和相应行为的各种异常。如果在work()方法中发生了一些等待操作(例如等待一个输入流或一个Socket),抛出一个InterruptedException的方法是可取的。要记住的是只要应用程序在运行,work()方法不需要任何while-loop循环去维持它运行,这一切由Worker代办了。 When the run() method starts, it calls the prepareWorker() which is designed to prepare all resources needed for a long-running task (Listing A). In this method call, you can, for example, establish a database connection or open a file that will be used further. It is especially good to place here some blocking operations like opening a socket, because they will be done in a separate thread and thus will not block the main thread of execution. run()开始时,调用prepareWorker()方法来准备长时间运行任务需要的所有资源(参考程序清单A)。例如 ,在这个方法中可以打开一个将要用到的数据库连接或文件。尤其对于那些像建立一个socket这样的阻塞操作放在这儿是很好的。因为若让它们在一个独立的线程中运行,则不会阻塞主线程的执行。 The opposite of the previous method is the releaseWorker() which is called when the run() method is about to exit (Listing A). Here, you can put the code to dispose of system resources used by this task or to perform other cleanup. This method is similar to java.lang.Object.finalize(), but it is explicitly called before a thread terminates. 与前面方法相反的是releaseWorker(),它在run()方法准备退出时被调用(参考程序清单A)。在该方法中你可以编写那些释放系统资源或执行其它清除动作的代码。该方法类似于java.lang.Object.finalize(),但它在线程中止时被显式的调用。 三、框架中的错误处理机制 Handling errors in the frameworkAnother important method is the handleError(), which takes a java.lang.Throwable as a parameter. This method is called each time an error situation occurs within the run() method. It is up to you how to implement error handling. One way is to log errors and control task termination by calling halt() method (Listing A). The isCondition() method is used to tell whether execution of the work() method can be started, thus allowing granular control over a task. It is useful in event-triggered frameworks when execution of the work() method is pending until some condition?for example, a buffer is not empty?is fulfilled. In Worker’s implementation, the condition is checked upon a lock notification and periodically with a time interval you specify in the setTimeout() method (Listing A). If you don’t need any waiting blocks in a task, just make the isCondition() method always return true. isCondition()方法用于判断work()方法是否能够被执行。因此允许细粒度地控制任务。这在事件触发的框架中非常有用。当work()方法的执行条件未满足时,work方法将被挂起,直到条件完全满足(例如,缓存区非空)。在Worker的实现中这个条件将按在方法setTimeout()中指定的时间周期地检查一个锁通知。如果在任务中不需要任何等待阻塞,仅仅只要使isCondition()方法总是返回真值。四、任务终止时机

JAVA任务流的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于Java任务流程怎么设计、JAVA任务流的信息别忘了在本站进行查找喔。

The End

发布于:2022-12-15,除非注明,否则均为首码项目网原创文章,转载请注明出处。