关于java写parquet的信息

博主:adminadmin 2022-11-23 02:34:07 68

本篇文章给大家谈谈java写parquet,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

hive 插入parquet二级分区表数据倾斜优化

单个表每天数据有50亿左右。需用二级分区优化该表。

错误:

Java Heap Space。或者GC overhead limit exceeded。

原因:

Parquet和ORC是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行INSERT语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致mappers或reducers的OOM,具体取决于打开的文件写入器(file writer)的数量。

通过INSERT语句插入数据到动态分区表中,也可能会超过HDFS同时打开文件数的限制。

如果没有join或聚合,INSERT ... SELECT语句会被转换为只有map任务的作业。mapper任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个mapper必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper在运行时所需的内存量随着它遇到的分区数量的增加而增加。

详细原因:

set hive.optimize.sort.dynamic.partition = true,从新跑上述语句。

通过这个优化,这个只有map任务的mapreduce会引入reduce过程,这样动态分区的那个字段比如日期在传到reducer时会被排序。由于分区字段是排序的,因此每个reducer只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写parquet文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。

但reduce阶段一直卡在99%,判断是uiappid数据倾斜导致。验证数据倾斜:

然后你会发现跑得特别慢。开启map group优化(Map端部分聚合,相当于Combiner):

设置上述参数即可。若是其他情况的group优化,可参考hive.groupby.skewindata参数。

有数据倾斜的时候进行负载均衡,当hive.groupby.skewindata设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

分两步:

1、第一步:找出条数大于1亿的uiappid后,select时过滤调这些大的uiappid。通过这个优化过,reduce阶段单个key的数据都不超过1亿条,可以快速得到结果。

2、第二步:再次将uiappid条数大于1亿的数据插入表中。因为大于1亿条的uiappid比较少,可以为每个mapper遇到的分区创建一个文件写入器(file writer)。

parquet(2)读写

1、大多数情况下,我们会使用高级工具来处理parquet文件,比如hive spark impala,不过有时候我们也需要进行低级顺序访问

2、parquet具有一个可插入式的内存数据模型,其作用是要让parquet文件格式更好地与类型广泛的各种工具集成,在java中,这种集成体现在readSupport 和 WriteSupport上

3、parquet写入

3.1、创建(MessageType)schema

3.2、创建parquet message实例 Group group

3.3、创建Groupwritersupport

3.4、创建parquetWriter

3.5、调用parquetwriter的write方法,最后closewriter

4、parquet读文件,更简单,不需要设置文件属性

4.1、创建groupreadsupport

4.2、创建parquetreader

4.3、调用read方法

5、大多数程序更倾向于使用avro、protocol buffers 或者thrift这样的框架来定义数据模型,parquet则迎合了这些需求

6、如avroparquetwriter protoparquetwriter thriftparquetwriter以及其分别对应的writer

上回话周朝东迁洛阳,此回话春秋时代

周王朝所属的每一个封国,都有自己完整的本国史,但是只有鲁国的留传下来,鲁国史称为“春秋”,所以史学家把公元前722年到公元前481年称为春秋时代。

卫国首先于719年政变,政变失败,接下来鲁国政变,宋国政变

宋国国君子与夷跟他的国防总司令孔父嘉是好朋友,孔父嘉的妻子十分美丽,大臣华督见了,立刻神魂颠倒,但是她的身份高贵,华督不能直接抢夺。那时,子与夷堂弟子冯流亡郑国,华督派人跟他联络,恰好孔父嘉积极训练军队准备出猎,华督散步谣言,“孔父嘉每次都被郑国打败”,煽动士兵,士兵祈求华督伸手援救,华督就率领他们攻杀孔父嘉,顺便把子与夷也杀了,子冯到了国君位置,华督得到了孔父嘉的妻子。

因为妻子过于漂亮引来杀身之祸的,孔父嘉是历史上第一人,但是因为美女而引起政权转移,王朝瓦解国家覆灭的却在以后经常出现。

使用Hive SQL插入动态分区的Parquet表OOM异常分析

1.异常描述

当运行“INSERT ... SELECT”语句向 Parquet 或者 ORC 格式的表中插入数据时,如果启用了动态分区,你可能会碰到以下错误,而导致作业无法正常执行。

Hive 客户端:

(可左右滑动)

YARN 的 8088 中查看具体 map task 报错:

(可左右滑动)

2.异常分析

Parquet 和 ORC 是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行 INSERT 语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致 mappers 或 reducers 的 OOM,具体取决于打开的文件写入器(file writer)的数量。

通过 INSERT 语句插入数据到动态分区表中,也可能会超过 HDFS 同时打开文件数的限制。

如果没有 join 或聚合,INSERT ... SELECT 语句会被转换为只有 map 任务的作业。mapper 任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个 mapper 必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper 在运行时所需的内存量随着它遇到的分区数量的增加而增加。

3.异常重现与解决

3.1.生成动态分区的几个参数说明

hive.exec.dynamic.partition

默认值:false

是否开启动态分区功能,默认 false 关闭。

使用动态分区时候,该参数必须设置成 true;

hive.exec.dynamic.partition.mode

默认值:strict

动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。

一般需要设置为 nonstrict

hive.exec.max.dynamic.partitions.pernode

默认值:100

在每个执行 MR 的节点上,最大可以创建多少个动态分区。

该参数需要根据实际的数据来设定。

比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。

hive.exec.max.dynamic.partitions

默认值:1000

在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。

同上参数解释。

hive.exec.max.created.files

默认值:100000

整个 MR Job 中,最大可以创建多少个 HDFS 文件。

一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于 100000,可根据实际情况加以调整。

mapreduce.map.memory.mb

map 任务的物理内存分配值,常见设置为 1GB,2GB,4GB 等。

mapreduce.map.java.opts

map 任务的 Java 堆栈大小设置,一般设置为小于等于上面那个值的 75%,这样可以保证 map 任务有足够的堆栈外内存空间。

mapreduce.input.fileinputformat.split.maxsize

mapreduce.input.fileinputformat.split.minsize

这个两个参数联合起来用,主要是为了方便控制 mapreduce 的 map 数量。比如我设置为 1073741824,就是为了让每个 map 处理 1GB 的文件。

3.2.一个例子

Fayson 在前两天给人调一个使用 Hive SQL 插入动态分区的 Parquet 表时,总是报错 OOM,也是折腾了很久。以下我们来看看整个过程。

1.首先我们看看执行脚本的内容,基本其实就是使用 Hive 的 insert 语句将文本数据表插入到另外一张 parquet 表中,当然使用了动态分区。

2.我们看看原始数据文件,是文本文件,一共 120 个,每个 30GB 大小,总共差不多 3.6TB。

3.我们看看报错

4.因为是一个只有 map 的 mapreduce 任务,当我们从 YARN 的 8088 观察这个作业时可以发现,基本没有一个 map 能够执行成功,全部都是失败的。报上面的错误。

5.把 mapreduce.map.memory.mb 从 2GB 增大到 4GB,8GB,16GB,相应 mapreduce.map.java.opts 增大到 3GB,6GB,12GB。依旧报错 OOM。

6.后面又将 mapreduce.input.fileinputformat.split.maxsize 从 1GB,减少为 512MB,256MB,从而增大 map 数量,缩小单个 map 处理文件的大小。依旧报错 OOM。

7.最后启用 hive.optimize.sort.dynamic.partition,增加 reduce 过程,作业执行成功。

8.最后查看结果文件大约 1.2TB,约为输入文件的三分之一。一共 1557 个分区,最大的分区文件为 2GB。

4.异常总结

对于这个异常,我们建议有以下三种方式来处理:

1.启用 hive.optimize.sort.dynamic.partition,将其设置为 true。通过这个优化,这个只有 map 任务的 mapreduce 会引入 reduce 过程,这样动态分区的那个字段比如日期在传到 reducer 时会被排序。由于分区字段是排序的,因此每个 reducer 只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写 parquet 文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。

2.第二种方式就是增加每个 mapper 的内存分配,即增大 mapreduce.map.memory.mb 和 mapreduce.map.java.opts,这样所有文件写入器(filewriter)缓冲区对应的内存会更充沛。

3.将查询分解为几个较小的查询,以减少每个查询创建的分区数量。这样可以让每个 mapper 打开较少的文件写入器(file writer)。

备注:

默认情况下,Hive 为每个打开的 Parquet 文件缓冲区(file buffer)分配 128MB。这个 buffer 大小由参数 parquet.block.size 控制。为获得最佳性能,parquet 的 buffer size 需要与 HDFS 的 block size 保持对齐(比如相等),从而使每个 parquet 文件在单个 HDFS 的块中,以便每个 I/O 请求都可以读取整个数据文件,而无需通过网络传输访问后续的 block。

参考:

如何将java对象转成parquet文件

把文本文件 直接转 parquet

可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量

压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间

只读取需要的列,支持向量运算,能够获取更好的扫描性能

Parquet就是基于Google的Dremel系统的数据模型和算法实现的。核心思想是使用“record shredding and assembly algorithm”来表示复杂的嵌套数据类型,同时辅以按列的高效压缩和编码技术,实现降低存

与Avro之前新统计系统的日志都是用Avro做序列化和存储,鉴于Parquet的优势和对Avro的兼容,将HDFS上的存储格式改为Paruqet,并且只需做很小的改动就用原读取Avro的API读取Parquet,以提高近一个数量级。

Parquet文件尾部存储了文件的元数据信息和统计信息,自描述的,方便解析

3W服务器采用什么方式来存储文件?

CPH服务提供云手机批量控制功能

该功能可以通过ADB命令API将存放在OBS桶中的APK安装文件等,批量推送或安装到云手机中,便捷用户操作,提升管理效率。本章节通过批量安装APK的示例来介绍批量控制功能。安装和更新APK有以下两种方式:通过API执行install命令进行操作,详情请参考安装apk。用户先将OBS桶中的安装包,授予读取权

Parquet采用柱状存储格式并且支持数据嵌套,Parquet-format项目由java实现,它定义了所有Parquet元数据对象(Parquet内部的数据类型、存储格式),Parquet的元数据是使用Apache Thrift进行序列化并存储在Parquet文件的尾部。本文主要是描述如何将CDH发行版本中的Parquet-format

关于java写parquet和的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

The End

发布于:2022-11-23,除非注明,否则均为首码项目网原创文章,转载请注明出处。