关于java写parquet的信息
本篇文章给大家谈谈java写parquet,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、hive 插入parquet二级分区表数据倾斜优化
- 2、parquet(2)读写
- 3、使用Hive SQL插入动态分区的Parquet表OOM异常分析
- 4、如何将java对象转成parquet文件
- 5、3W服务器采用什么方式来存储文件?
hive 插入parquet二级分区表数据倾斜优化
单个表每天数据有50亿左右。需用二级分区优化该表。
错误:
Java Heap Space。或者GC overhead limit exceeded。
原因:
Parquet和ORC是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行INSERT语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致mappers或reducers的OOM,具体取决于打开的文件写入器(file writer)的数量。
通过INSERT语句插入数据到动态分区表中,也可能会超过HDFS同时打开文件数的限制。
如果没有join或聚合,INSERT ... SELECT语句会被转换为只有map任务的作业。mapper任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个mapper必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper在运行时所需的内存量随着它遇到的分区数量的增加而增加。
详细原因:
set hive.optimize.sort.dynamic.partition = true,从新跑上述语句。
通过这个优化,这个只有map任务的mapreduce会引入reduce过程,这样动态分区的那个字段比如日期在传到reducer时会被排序。由于分区字段是排序的,因此每个reducer只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写parquet文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。
但reduce阶段一直卡在99%,判断是uiappid数据倾斜导致。验证数据倾斜:
然后你会发现跑得特别慢。开启map group优化(Map端部分聚合,相当于Combiner):
设置上述参数即可。若是其他情况的group优化,可参考hive.groupby.skewindata参数。
有数据倾斜的时候进行负载均衡,当hive.groupby.skewindata设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
分两步:
1、第一步:找出条数大于1亿的uiappid后,select时过滤调这些大的uiappid。通过这个优化过,reduce阶段单个key的数据都不超过1亿条,可以快速得到结果。
2、第二步:再次将uiappid条数大于1亿的数据插入表中。因为大于1亿条的uiappid比较少,可以为每个mapper遇到的分区创建一个文件写入器(file writer)。
parquet(2)读写
1、大多数情况下,我们会使用高级工具来处理parquet文件,比如hive spark impala,不过有时候我们也需要进行低级顺序访问
2、parquet具有一个可插入式的内存数据模型,其作用是要让parquet文件格式更好地与类型广泛的各种工具集成,在java中,这种集成体现在readSupport 和 WriteSupport上
3、parquet写入
3.1、创建(MessageType)schema
3.2、创建parquet message实例 Group group
3.3、创建Groupwritersupport
3.4、创建parquetWriter
3.5、调用parquetwriter的write方法,最后closewriter
4、parquet读文件,更简单,不需要设置文件属性
4.1、创建groupreadsupport
4.2、创建parquetreader
4.3、调用read方法
5、大多数程序更倾向于使用avro、protocol buffers 或者thrift这样的框架来定义数据模型,parquet则迎合了这些需求
6、如avroparquetwriter protoparquetwriter thriftparquetwriter以及其分别对应的writer
上回话周朝东迁洛阳,此回话春秋时代
周王朝所属的每一个封国,都有自己完整的本国史,但是只有鲁国的留传下来,鲁国史称为“春秋”,所以史学家把公元前722年到公元前481年称为春秋时代。
卫国首先于719年政变,政变失败,接下来鲁国政变,宋国政变
宋国国君子与夷跟他的国防总司令孔父嘉是好朋友,孔父嘉的妻子十分美丽,大臣华督见了,立刻神魂颠倒,但是她的身份高贵,华督不能直接抢夺。那时,子与夷堂弟子冯流亡郑国,华督派人跟他联络,恰好孔父嘉积极训练军队准备出猎,华督散步谣言,“孔父嘉每次都被郑国打败”,煽动士兵,士兵祈求华督伸手援救,华督就率领他们攻杀孔父嘉,顺便把子与夷也杀了,子冯到了国君位置,华督得到了孔父嘉的妻子。
因为妻子过于漂亮引来杀身之祸的,孔父嘉是历史上第一人,但是因为美女而引起政权转移,王朝瓦解国家覆灭的却在以后经常出现。
使用Hive SQL插入动态分区的Parquet表OOM异常分析
1.异常描述
当运行“INSERT ... SELECT”语句向 Parquet 或者 ORC 格式的表中插入数据时,如果启用了动态分区,你可能会碰到以下错误,而导致作业无法正常执行。
Hive 客户端:
(可左右滑动)
YARN 的 8088 中查看具体 map task 报错:
(可左右滑动)
2.异常分析
Parquet 和 ORC 是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行 INSERT 语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致 mappers 或 reducers 的 OOM,具体取决于打开的文件写入器(file writer)的数量。
通过 INSERT 语句插入数据到动态分区表中,也可能会超过 HDFS 同时打开文件数的限制。
如果没有 join 或聚合,INSERT ... SELECT 语句会被转换为只有 map 任务的作业。mapper 任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个 mapper 必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper 在运行时所需的内存量随着它遇到的分区数量的增加而增加。
3.异常重现与解决
3.1.生成动态分区的几个参数说明
hive.exec.dynamic.partition
默认值:false
是否开启动态分区功能,默认 false 关闭。
使用动态分区时候,该参数必须设置成 true;
hive.exec.dynamic.partition.mode
默认值:strict
动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。
一般需要设置为 nonstrict
hive.exec.max.dynamic.partitions.pernode
默认值:100
在每个执行 MR 的节点上,最大可以创建多少个动态分区。
该参数需要根据实际的数据来设定。
比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。
hive.exec.max.dynamic.partitions
默认值:1000
在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。
同上参数解释。
hive.exec.max.created.files
默认值:100000
整个 MR Job 中,最大可以创建多少个 HDFS 文件。
一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于 100000,可根据实际情况加以调整。
mapreduce.map.memory.mb
map 任务的物理内存分配值,常见设置为 1GB,2GB,4GB 等。
mapreduce.map.java.opts
map 任务的 Java 堆栈大小设置,一般设置为小于等于上面那个值的 75%,这样可以保证 map 任务有足够的堆栈外内存空间。
mapreduce.input.fileinputformat.split.maxsize
mapreduce.input.fileinputformat.split.minsize
这个两个参数联合起来用,主要是为了方便控制 mapreduce 的 map 数量。比如我设置为 1073741824,就是为了让每个 map 处理 1GB 的文件。
3.2.一个例子
Fayson 在前两天给人调一个使用 Hive SQL 插入动态分区的 Parquet 表时,总是报错 OOM,也是折腾了很久。以下我们来看看整个过程。
1.首先我们看看执行脚本的内容,基本其实就是使用 Hive 的 insert 语句将文本数据表插入到另外一张 parquet 表中,当然使用了动态分区。
2.我们看看原始数据文件,是文本文件,一共 120 个,每个 30GB 大小,总共差不多 3.6TB。
3.我们看看报错
4.因为是一个只有 map 的 mapreduce 任务,当我们从 YARN 的 8088 观察这个作业时可以发现,基本没有一个 map 能够执行成功,全部都是失败的。报上面的错误。
5.把 mapreduce.map.memory.mb 从 2GB 增大到 4GB,8GB,16GB,相应 mapreduce.map.java.opts 增大到 3GB,6GB,12GB。依旧报错 OOM。
6.后面又将 mapreduce.input.fileinputformat.split.maxsize 从 1GB,减少为 512MB,256MB,从而增大 map 数量,缩小单个 map 处理文件的大小。依旧报错 OOM。
7.最后启用 hive.optimize.sort.dynamic.partition,增加 reduce 过程,作业执行成功。
8.最后查看结果文件大约 1.2TB,约为输入文件的三分之一。一共 1557 个分区,最大的分区文件为 2GB。
4.异常总结
对于这个异常,我们建议有以下三种方式来处理:
1.启用 hive.optimize.sort.dynamic.partition,将其设置为 true。通过这个优化,这个只有 map 任务的 mapreduce 会引入 reduce 过程,这样动态分区的那个字段比如日期在传到 reducer 时会被排序。由于分区字段是排序的,因此每个 reducer 只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写 parquet 文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。
2.第二种方式就是增加每个 mapper 的内存分配,即增大 mapreduce.map.memory.mb 和 mapreduce.map.java.opts,这样所有文件写入器(filewriter)缓冲区对应的内存会更充沛。
3.将查询分解为几个较小的查询,以减少每个查询创建的分区数量。这样可以让每个 mapper 打开较少的文件写入器(file writer)。
备注:
默认情况下,Hive 为每个打开的 Parquet 文件缓冲区(file buffer)分配 128MB。这个 buffer 大小由参数 parquet.block.size 控制。为获得最佳性能,parquet 的 buffer size 需要与 HDFS 的 block size 保持对齐(比如相等),从而使每个 parquet 文件在单个 HDFS 的块中,以便每个 I/O 请求都可以读取整个数据文件,而无需通过网络传输访问后续的 block。
参考:
如何将java对象转成parquet文件
把文本文件 直接转 parquet
可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量
压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间
只读取需要的列,支持向量运算,能够获取更好的扫描性能
Parquet就是基于Google的Dremel系统的数据模型和算法实现的。核心思想是使用“record shredding and assembly algorithm”来表示复杂的嵌套数据类型,同时辅以按列的高效压缩和编码技术,实现降低存
与Avro之前新统计系统的日志都是用Avro做序列化和存储,鉴于Parquet的优势和对Avro的兼容,将HDFS上的存储格式改为Paruqet,并且只需做很小的改动就用原读取Avro的API读取Parquet,以提高近一个数量级。
Parquet文件尾部存储了文件的元数据信息和统计信息,自描述的,方便解析
3W服务器采用什么方式来存储文件?
CPH服务提供云手机批量控制功能
该功能可以通过ADB命令API将存放在OBS桶中的APK安装文件等,批量推送或安装到云手机中,便捷用户操作,提升管理效率。本章节通过批量安装APK的示例来介绍批量控制功能。安装和更新APK有以下两种方式:通过API执行install命令进行操作,详情请参考安装apk。用户先将OBS桶中的安装包,授予读取权
Parquet采用柱状存储格式并且支持数据嵌套,Parquet-format项目由java实现,它定义了所有Parquet元数据对象(Parquet内部的数据类型、存储格式),Parquet的元数据是使用Apache Thrift进行序列化并存储在Parquet文件的尾部。本文主要是描述如何将CDH发行版本中的Parquet-format
关于java写parquet和的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
发布于:2022-11-23,除非注明,否则均为
原创文章,转载请注明出处。