关于javaparquet的信息
今天给各位分享javaparquet的知识,其中也会对进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:
- 1、使用Hive SQL插入动态分区的Parquet表OOM异常分析
- 2、如何建立一个完整可用的安全大数据平台
- 3、RDD,DataFrame和DataSet的区别
- 4、hive 插入parquet二级分区表数据倾斜优化
- 5、parquet(2)读写
- 6、如何将java对象转成parquet文件
使用Hive SQL插入动态分区的Parquet表OOM异常分析
1.异常描述
当运行“INSERT ... SELECT”语句向 Parquet 或者 ORC 格式的表中插入数据时,如果启用了动态分区,你可能会碰到以下错误,而导致作业无法正常执行。
Hive 客户端:
(可左右滑动)
YARN 的 8088 中查看具体 map task 报错:
(可左右滑动)
2.异常分析
Parquet 和 ORC 是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行 INSERT 语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致 mappers 或 reducers 的 OOM,具体取决于打开的文件写入器(file writer)的数量。
通过 INSERT 语句插入数据到动态分区表中,也可能会超过 HDFS 同时打开文件数的限制。
如果没有 join 或聚合,INSERT ... SELECT 语句会被转换为只有 map 任务的作业。mapper 任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个 mapper 必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper 在运行时所需的内存量随着它遇到的分区数量的增加而增加。
3.异常重现与解决
3.1.生成动态分区的几个参数说明
hive.exec.dynamic.partition
默认值:false
是否开启动态分区功能,默认 false 关闭。
使用动态分区时候,该参数必须设置成 true;
hive.exec.dynamic.partition.mode
默认值:strict
动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。
一般需要设置为 nonstrict
hive.exec.max.dynamic.partitions.pernode
默认值:100
在每个执行 MR 的节点上,最大可以创建多少个动态分区。
该参数需要根据实际的数据来设定。
比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。
hive.exec.max.dynamic.partitions
默认值:1000
在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。
同上参数解释。
hive.exec.max.created.files
默认值:100000
整个 MR Job 中,最大可以创建多少个 HDFS 文件。
一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于 100000,可根据实际情况加以调整。
mapreduce.map.memory.mb
map 任务的物理内存分配值,常见设置为 1GB,2GB,4GB 等。
mapreduce.map.java.opts
map 任务的 Java 堆栈大小设置,一般设置为小于等于上面那个值的 75%,这样可以保证 map 任务有足够的堆栈外内存空间。
mapreduce.input.fileinputformat.split.maxsize
mapreduce.input.fileinputformat.split.minsize
这个两个参数联合起来用,主要是为了方便控制 mapreduce 的 map 数量。比如我设置为 1073741824,就是为了让每个 map 处理 1GB 的文件。
3.2.一个例子
Fayson 在前两天给人调一个使用 Hive SQL 插入动态分区的 Parquet 表时,总是报错 OOM,也是折腾了很久。以下我们来看看整个过程。
1.首先我们看看执行脚本的内容,基本其实就是使用 Hive 的 insert 语句将文本数据表插入到另外一张 parquet 表中,当然使用了动态分区。
2.我们看看原始数据文件,是文本文件,一共 120 个,每个 30GB 大小,总共差不多 3.6TB。
3.我们看看报错
4.因为是一个只有 map 的 mapreduce 任务,当我们从 YARN 的 8088 观察这个作业时可以发现,基本没有一个 map 能够执行成功,全部都是失败的。报上面的错误。
5.把 mapreduce.map.memory.mb 从 2GB 增大到 4GB,8GB,16GB,相应 mapreduce.map.java.opts 增大到 3GB,6GB,12GB。依旧报错 OOM。
6.后面又将 mapreduce.input.fileinputformat.split.maxsize 从 1GB,减少为 512MB,256MB,从而增大 map 数量,缩小单个 map 处理文件的大小。依旧报错 OOM。
7.最后启用 hive.optimize.sort.dynamic.partition,增加 reduce 过程,作业执行成功。
8.最后查看结果文件大约 1.2TB,约为输入文件的三分之一。一共 1557 个分区,最大的分区文件为 2GB。
4.异常总结
对于这个异常,我们建议有以下三种方式来处理:
1.启用 hive.optimize.sort.dynamic.partition,将其设置为 true。通过这个优化,这个只有 map 任务的 mapreduce 会引入 reduce 过程,这样动态分区的那个字段比如日期在传到 reducer 时会被排序。由于分区字段是排序的,因此每个 reducer 只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写 parquet 文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。
2.第二种方式就是增加每个 mapper 的内存分配,即增大 mapreduce.map.memory.mb 和 mapreduce.map.java.opts,这样所有文件写入器(filewriter)缓冲区对应的内存会更充沛。
3.将查询分解为几个较小的查询,以减少每个查询创建的分区数量。这样可以让每个 mapper 打开较少的文件写入器(file writer)。
备注:
默认情况下,Hive 为每个打开的 Parquet 文件缓冲区(file buffer)分配 128MB。这个 buffer 大小由参数 parquet.block.size 控制。为获得最佳性能,parquet 的 buffer size 需要与 HDFS 的 block size 保持对齐(比如相等),从而使每个 parquet 文件在单个 HDFS 的块中,以便每个 I/O 请求都可以读取整个数据文件,而无需通过网络传输访问后续的 block。
参考:
如何建立一个完整可用的安全大数据平台
“
要建立一个大数据系统,我们需要从数据流的源头跟踪到最后有价值的输出,并在现有的Hadoop和大数据生态圈内根据实际需求挑选并整合各部分合适的组件来构建一个能够支撑多种查询和分析功能的系统平台。这其中既包括了对数据存储的选择,也涵盖了数据线上和线下处理分离等方面的思考和权衡。此外,没有任何一个引入大数据解决方案的商业应用在生产环境上承担的起安全隐患。
1
计算框架篇
大数据的价值
只有在能指导人们做出有价值的决定时,数据才能体现其自身的价值。因此,大数据技术要服务于实际的用途,才是有意义的。一般来说,大数据可以从以下三个方面指导人们做出有价值的决定:
报表生成(比如根据用户历史点击行为的跟踪和综合分析、 应用程序活跃程度和用户粘性计算等);
诊断分析(例如分析为何用户粘性下降、根据日志分析系统为何性能下降、垃圾邮件以及病毒的特征检测等);
决策(例如个性化新闻阅读或歌曲推荐、预测增加哪些功能能增加用户粘性、帮助广告主进行广告精准投放、设定垃圾邮件和病毒拦截策略等)。
图 1
进一步来看,大数据技术从以下三个方面解决了传统技术难以达成的目标(如图1):
在历史数据上的低延迟(交互式)查询,目标是加快决策过程和时间, 例如分析一个站点为何变缓慢并尝试修复它;
在实时数据上的低延迟查询,目的是帮助用户和应用程序在实时数据上做出决策, 例如实时检测并阻拦病毒蠕虫(一个病毒蠕虫可以在1.3秒内攻击1百万台主机);
更加精细高级的数据处理算法,这可以帮助用户做出“更好”的决策, 例如图数据处理、异常点检测、趋势分析及其他机器学习算法。
蛋糕模式
从将数据转换成价值的角度来说,在Hadoop生态圈十年蓬勃成长的过程中,YARN和Spark这二者可以算得上是里程碑事件。Yarn的出现使得集群资源管理和数据处理流水线分离,大大革新并推动了大数据应用层面各种框架的发展(SQL on Hadoop框架, 流数据,图数据,机器学习)。
它使得用户不再受到MapReduce开发模式的约束,而是可以创建种类更为丰富的分布式应用程序,并让各类应用程序运行在统一的架构上,消除了为其他框架维护独有资源的开销。就好比一个多层蛋糕,下面两层是HDFS和Yarn, 而MapReduce就只是蛋糕上层的一根蜡烛而已,在蛋糕上还能插各式各样的蜡烛。
在这一架构体系中,总体数据处理分析作业分三块(图2),在HBase上做交互式查询(Apache Phoenix, Cloudera Impala等), 在历史数据集上编写MapReduce程序抑或利用Hive等做批处理业务, 另外对于实时流数据分析Apache Storm则会是一种标准选择方案。
虽然Yarn的出现极大地丰富了Hadoop生态圈的应用场景,但仍存有两个显而易见的挑战:一是在一个平台上需要维护三个开发堆栈;二是在不同框架内很难共享数据,比如很难在一个框架内对流数据做交互式查询。这也意味着我们需要一个更为统一和支持更好抽象的计算框架的出现。
图 2
一统江湖
Spark的出现使得批处理任务,交互式查询,实时流数据处理被整合到一个统一的框架内(图3),同时Spark和现有的开源生态系统也能够很好地兼容(Hadoop, HDFS, Yarn, Hive, Flume)。 通过启用内存分布数据集,优化迭代工作负载, 用户能够更简单地操作数据,并在此基础上开发更为精细的算法,如机器学习和图算法等。
有三个最主要的原因促使Spark目前成为了时下最火的大数据开源社区(拥有超过来自200多个公司的800多个contributors):
Spark可以扩展部署到超过8000节点并处理PB级别的数据,同时也提供了很多不错的工具供应用开发者进行管理和部署;
Spark提供了一个交互式shell供开发者可以用Scala或者Python即时性试验不同的功能;
Spark提供了很多内置函数使得开发者能够比较容易地写出低耦合的并且能够并发执行的代码,这样开发人员就更能集中精力地为用户提供更多的业务功能而不是花费时间在优化并行化代码之上。
当然Spark也和当年的MapReduce一样不是万灵药,比如对实时性要求很高的流数据处理上Apache Storm还是被作为主流选择, 因为Spark Streaming实际上是microbatch(将一个流数据按时间片切成batch,每个batch提交一个job)而不是事件触发实时系统,所以虽然支持者们认为microbatch在系统延时性上贡献并不多,但在生产环境中和Apache Storm相比还不是特别能满足对低延时要求很高的应用场景。
比如在实践过程中, 如果统计每条消息的平均处理时间,很容易达到毫秒级别,但一旦统计类似service assurance(确保某条消息在毫秒基本能被处理完成)的指标, 系统的瓶颈有时还是不能避免。
但同时我们不能不注意到,在许多用例当中,与流数据的交互以及和静态数据集的结合是很有必要的, 例如我们需要在静态数据集上进行分类器的模型计算,并在已有分类器模型的基础上,对实时进入系统的流数据进行交互计算来判定类别。
由于Spark的系统设计对各类工作(批处理、流处理以及交互式工作)进行了一个共有抽象,并且生态圈内延伸出了许多丰富的库(MLlib机器学习库、SQL语言API、GraphX), 使得用户可以在每一批流数据上进行灵活的Spark相关操作,在开发上提供了许多便利。
Spark的成熟使得Hadoop生态圈在短短一年之间发生了翻天覆地的变化, Cloudera和Hortonworks纷纷加入了Spark阵营,而Hadoop项目群中除了Yarn之外已经没有项目是必须的了(虽然Mesos已在一些场合替代了Yarn), 因为就连HDFS,Spark都可以不依赖。但很多时候我们仍然需要像Impala这样的依赖分布式文件系统的MPP解决方案并利用Hive管理文件到表的映射,因此Hadoop传统生态圈依然有很强的生命力。
另外在这里简要对比一下交互式分析任务中各类SQL on Hadoop框架,因为这也是我们在实际项目实施中经常遇到的问题。我们主要将注意力集中在Spark SQL, Impala和Hive on Tez上, 其中Spark SQL是三者之中历史最短的,论文发表在15年的SIGMOD会议上, 原文对比了数据仓库上不同类型的查询在Shark(Spark最早对SQL接口提供的支持)、Spark SQL和Impala上的性能比较。
也就是说, 虽然Spark SQL在Shark的基础上利用Catalyst optimizer在代码生成上做了很多优化,但总体性能还是比不上Impala, 尤其是当做join操作的时候, Impala可以利用“predicate pushdown”更早对表进行选择操作从而提高性能。
不过Spark SQL的Catalyst optimizer一直在持续优化中,相信未来会有更多更好的进展。Cloudera的Benchmark评测中Impala一直比其他SQL on Hadoop框架性能更加优越,但同时Hortonworks评测则指出虽然单个数据仓库查询Impala可以在很短的时间内完成,但是一旦并发多个查询Hive on Tez的优势就展示出来。另外Hive on Tez在SQL表达能力也要比Impala更强(主要是因为Impala的嵌套存储模型导致的), 因此根据不同的场景选取不同的解决方案是很有必要的。
图 3
各领风骚抑或代有才人出?
近一年比较吸引人眼球的Apache Flink(与Spark一样已有5年历史,前身已经是柏林理工大学一个研究性项目,被其拥趸推崇为继MapReduce, Yarn,Spark之后第四代大数据分析处理框架)。 与Spark相反,Flink是一个真正的实时流数据处理系统,它将批处理看作是流数据的特例,同Spark一样它也在尝试建立一个统一的平台运行批量,流数据,交互式作业以及机器学习,图算法等应用。
Flink有一些设计思路是明显区别于Spark的,一个典型的例子是内存管理,Flink从一开始就坚持自己精确的控制内存使用并且直接操作二进制数据,而Spark一直到1.5版本都还是试用java的内存管理来做数据缓存,这也导致了Spark很容易遭受OOM以及JVM GC带来的性能损失。
但是从另外一个角度来说, Spark中的RDD在运行时被存成java objects的设计模式也大大降低了用户编程设计门槛, 同时随着Tungsten项目的引入,Spark现在也逐渐转向自身的内存管理, 具体表现为Spark生态圈内从传统的围绕RDD(分布式java对象集合)为核心的开发逐渐转向以DataFrame(分布式行对象集合)为核心。
总的来说,这两个生态圈目前都在互相学习,Flink的设计基因更为超前一些,但Spark社区活跃度大很多,发展到目前毫无疑问是更为成熟的选择,比如对数据源的支持(HBase, Cassandra, Parquet, JSON, ORC)更为丰富以及更为统一简洁的计算表示。另一方面,Apache Flink作为一个由欧洲大陆发起的项目,目前已经拥有来自北美、欧洲以及亚洲的许多贡献者,这是否能够一改欧洲在开源世界中一贯的被动角色,我们将在未来拭目以待。
2
NoSQL数据库篇
NoSQL数据库在主流选择上依旧集中在MongoDB, HBase和Cassandra这三者之间。在所有的NoSQL选择中,用C 编写的MongoDB几乎应该是开发者最快也最易部署的选择。MongoDB是一个面向文档的数据库,每个文档/记录/数据(包括爬取的网页数据及其他大型对象如视频等)是以一种BSON(Binary JSON)的二进制数据格式存储, 这使得MongoDB并不需要事先定义任何模式, 也就是模式自由(可以把完全不同结构的记录放在同一个数据库里)。
MongoDB对于完全索引的支持在应用上是很方便的,同时也具备一般NoSQL分布式数据库中可扩展,支持复制和故障恢复等功能。 MongoDB一般应用于高度伸缩性的缓存及大尺寸的JSON数据存储业务中,但不能执行“JOIN”操作,而且数据占用空间也比较大,最被用户诟病的就是由于MongoDB提供的是数据库级锁粒度导致在一些情况下建索引操作会引发整个数据库阻塞。一般来说,MongoDB完全可以满足一些快速迭代的中小型项目的需求。
下面来主要谈谈Cassandra和HBase之间的比较选择。Cassandra和HBase有着截然不同的基因血统。HBase和其底层依赖的系统架构源自于著名的Google FileSystem(发表于2003年)和Google BigTable设计(发表于2006年), 其克服了HDFS注重吞吐量却牺牲I/O的缺点,提供了一个存储中间层使得用户或者应用程序可以随机读写数据。
具体来说,HBase的更新和删除操作实际上是先发生在内存MemStore中, 当MemStore满了以后会Flush到StoreFile, 之后当StoreFile文件数量增长到一定阈值后会触发Compact合并操作,因此HBase的更新操作其实是不断追加的操作,而最终所有更新和删除数据的持久化操作都是在之后Compact过程中进行的。
这使得应用程序在向内存MemStore写入数据后,所做的修改马上就能得到反映,用户读到的数据绝不会是陈旧的数据,保证了I/O高性能和数据完全一致性; 另一方面来说, HBase基于Hadoop生态系统的基因就已经决定了他自身的高度可扩展性、容错性。
在数据模型上,Cassandra和HBase类似实现了一个key-value提供面向列式存储服务,其系统设计参考了 Amazon Dynamo (发表于2007年) 分布式哈希(DHT)的P2P结构(实际上大部分Cassandra的初始工作都是由两位从Amazon的Dynamo组跳槽到Facebook的工程师完成),同样具有很高的可扩展性和容错性等特点。
除此之外, 相对HBase的主从结构,Cassandra去中心化的P2P结构能够更简单地部署和维护,比如增加一台机器只需告知Cassandra系统新节点在哪,剩下的交给系统完成就行了。同时,Cassandra对多数据中心的支持也更好,如果需要在多个数据中心进行数据迁移Cassandra会是一个更优的选择。
Eric Brewer教授提出的经典CAP理论认为任何基于网络的数据共享系统,最多只能满足数据一致性、可用性、分区容忍性三要素中的两个要素。实际分布式系统的设计过程往往都是在一致性与可用性上进行取舍,相比于HBase数据完全一致性的系统设计,Cassandra选择了在优先考虑数据可用性的基础上让用户自己根据应用程序需求决定系统一致性级别。
比如:用户可以配置QUONUM参数来决定系统需要几个节点返回数据才能向客户端做出响应,ONE指只要有一个节点返回数据就可以对客户端做出响应,ALL指等于数据复制份数的所有节点都返回结果才能向客户端做出响应,对于数据一致性要求不是特别高的可以选择ONE,它是最快的一种方式。
从基因和发展历史上来说,HBase更适合用做数据仓库和大规模数据处理与分析(比如对网页数据建立索引), 而Cassandra则更适合用作实时事务和交互式查询服务。Cassandra在国外市场占有比例和发展要远比国内红火, 在不少权威测评网站上排名都已经超过了HBase。目前Apache Cassandra的商业化版本主要由软件公司DataStax进行开发和销售推广。另外还有一些NoSQL分布式数据库如Riak, CouchDB也都在各自支持的厂商推动下取得了不错的发展。
虽然我们也考虑到了HBase在实际应用中的不便之处比如对二级索引的支持程度不够(只支持通过单个行键访问,通过行键的范围查询,全表扫描),不过在明略的大数据基础平台上,目前整合的是依然是HBase。
理由也很简单,HBase出身就与Hadoop的生态系统紧密集成,其能够很容易与其他SQL on Hadoop框架(Cloudera Impala, Apache Phoenix, or Hive on Tez)进行整合,而不需要重新部署一套分布式数据库系统,而且可以很方便地将同样的数据内容在同一个生态系统中根据不同框架需要来变换存储格式(比如存储成Hive表或者Parquet格式)。
我们在很多项目中都有需要用到多种SQL on Hadoop框架,来应对不同应用场景的情况,也体会到了在同一生态系统下部署多种框架的简便性。 但同时我们也遇到了一些问题, 因为HBase项目本身与HDFS和Zookeeper系统分别是由不同开源团队进行维护的,所以在系统整合时我们需要先对HBase所依赖的其他模块进行设置再对HBase进行配置,在一定程度上降低了系统维护的友好性。
目前我们也已经在考虑将Cassandra应用到一些新的客户项目中,因为很多企业级的应用都需要将线上线下数据库进行分离,HBase更适合存储离线处理的结果和数据仓库,而更适合用作实时事务和并发交互性能更好的Cassandra作为线上服务数据库会是一种很好的选择。
3
大数据安全篇
随着越来越多各式各样的数据被存储在大数据系统中,任何对企业级数据的破坏都是灾难性的,从侵犯隐私到监管违规,甚至会造成公司品牌的破坏并最终影响到股东收益。给大数据系统提供全面且有效的安全解决方案的需求已经十分迫切:
大数据系统存储着许多重要且敏感的数据,这些数据是企业长久以来的财富
与大数据系统互动的外部系统是动态变化的,这会给系统引入新的安全隐患
在一个企业的内部,不同Business Units会用不同的方式与大数据系统进行交互,比如线上的系统会实时给集群推送数据、数据科学家团队则需要分析存储在数据仓库内的历史数据、运维团队则会需要对大数据系统拥有管理权限。
因此为了保护公司业务、客户、财务和名誉免于被侵害,大数据系统运维团队必须将系统安全高度提高到和其他遗留系统一样的级别。同时大数据系统并不意味着引入大的安全隐患,通过精细完整的设计,仍然能够把一些传统的系统安全解决方案对接到最新的大数据集群系统中。
一般来说,一个完整的企业级安全框架包括五个部分:
Administration: 大数据集群系统的集中式管理,设定全局一致的安全策略
Authentication: 对用户和系统的认证
Authorization:授权个人用户和组对数据的访问权限
Audit:维护数据访问的日志记录
Data Protection:数据脱敏和加密以达到保护数据的目的
系统管理员要能够提供覆盖以上五个部分的企业级安全基础设施,否则任何一环的缺失都可能给整个系统引入安全性风险。
在大数据系统安全集中式管理平台这块,由Hortonworks推出的开源项目Apache Ranger就可以十分全面地为用户提供Hadoop生态圈的集中安全策略的管理,并解决授权(Authorization)和审计(Audit)。例如,运维管理员可以轻松地为个人用户和组对文件、数据等的访问策略,然后审计对数据源的访问。
与Ranger提供相似功能的还有Cloudera推出的Apache Sentry项目,相比较而言Ranger的功能会更全面一些。
而在认证(Authentication)方面, 一种普遍采用的解决方案是将基于Kerberos的认证方案对接到企业内部的LDAP环境中, Kerberos也是唯一为Hadoop全面实施的验证技术。
另外值得一提的是Apache Knox Gateway项目,与Ranger提高集群内部组件以及用户互相访问的安全不同,Knox提供的是Hadoop集群与外界的唯一交互接口,也就是说所有与集群交互的REST API都通过Knox处理。这样,Knox就给大数据系统提供了一个很好的基于边缘的安全(perimeter-based security)。
基于以上提到的五个安全指标和Hadoop生态圈安全相关的开源项目, 已经足已证明基于Hadoop的大数据平台我们是能够构建一个集中、一致、全面且有效的安全解决方案。
我市再ITjob管网上面找的
RDD,DataFrame和DataSet的区别
RDD、DataFrame和DataSet是容易产生混淆的概念,必须对其相互之间对比,才可以知道其中异同。
RDD和DataFrame
RDD-DataFrame
上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解
Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark
SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的
Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效
率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。
提升执行效率
RDD
API是函数式的,强调不变性,在大部分场景下倾向于创建新对象而不是修改老对象。这一特点虽然带来了干净整洁的API,却也使得Spark应用程序在运
行期倾向于创建大量临时对象,对GC造成压力。在现有RDD
API的基础之上,我们固然可以利用mapPartitions方法来重载RDD单个分片内的数据创建方式,用复用可变对象的方式来减小对象分配和GC的
开销,但这牺牲了代码的可读性,而且要求开发者对Spark运行时机制有一定的了解,门槛较高。另一方面,Spark
SQL在框架内部已经在各种可能的情况下尽量重用对象,这样做虽然在内部会打破了不变性,但在将数据返回给用户时,还会重新转为不可变数据。利用
DataFrame API进行开发,可以免费地享受到这些优化效果。
减少数据读取
分析大数据,最快的方法就是 ——忽略它。这里的“忽略”并不是熟视无睹,而是根据查询条件进行恰当的剪枝。
上文讨论分区表时提到的分区剪 枝便是其中一种——当查询的过滤条件中涉及到分区列时,我们可以根据查询条件剪掉肯定不包含目标数据的分区目录,从而减少IO。
对于一些“智能”数据格 式,Spark
SQL还可以根据数据文件中附带的统计信息来进行剪枝。简单来说,在这类数据格式中,数据是分段保存的,每段数据都带有最大值、最小值、null值数量等
一些基本的统计信息。当统计信息表名某一数据段肯定不包括符合查询条件的目标数据时,该数据段就可以直接跳过(例如某整数列a某段的最大值为100,而查
询条件要求a 200)。
此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存储格式的优势,仅扫描查询真正涉及的列,忽略其余列的数据。
执行优化
人口数据分析示例
为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如
果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter
下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark
SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。
得到的优化执行计划在转换成物 理执行计划的过程中,还可以根据具体的数据源的特性将过滤条件下推至数据源内。最右侧的物理执行计划中Filter之所以消失不见,就是因为溶入了用于执行最终的读取操作的表扫描节点内。
对于普通开发者而言,查询优化 器的意义在于,即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。
RDD和DataSet
DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。
DataSet创立需要一个显式的Encoder,把对象序列化为二进制,可以把对象的scheme映射为SparkSQl类型,然而RDD依赖于运行时反射机制。
通过上面两点,DataSet的性能比RDD的要好很多。
DataFrame和DataSet
Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:
DataSet可以在编译时检查类型
并且是面向对象的编程接口。用wordcount举例:
//DataFrame
// Load a text file and interpret each line as a java.lang.String
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
val result = ds
.flatMap(_.split(" ")) // Split on whitespace
.filter(_ != "") // Filter empty words
.toDF() // Convert to DataFrame to perform aggregation / sorting
.groupBy($"value") // Count number of occurences of each word
.agg(count("*") as "numOccurances")
.orderBy($"numOccurances" desc) // Show most common words first
后面版本DataFrame会继承DataSet,DataFrame是面向Spark SQL的接口。
//DataSet,完全使用scala编程,不要切换到DataFrame
val wordCount =
ds.flatMap(_.split(" "))
.filter(_ != "")
.groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
.count()
DataFrame和DataSet可以相互转化, df.as[ElementType] 这样可以把DataFrame转化为DataSet, ds.toDF() 这样可以把DataSet转化为DataFrame。
hive 插入parquet二级分区表数据倾斜优化
单个表每天数据有50亿左右。需用二级分区优化该表。
错误:
Java Heap Space。或者GC overhead limit exceeded。
原因:
Parquet和ORC是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行INSERT语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致mappers或reducers的OOM,具体取决于打开的文件写入器(file writer)的数量。
通过INSERT语句插入数据到动态分区表中,也可能会超过HDFS同时打开文件数的限制。
如果没有join或聚合,INSERT ... SELECT语句会被转换为只有map任务的作业。mapper任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个mapper必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper在运行时所需的内存量随着它遇到的分区数量的增加而增加。
详细原因:
set hive.optimize.sort.dynamic.partition = true,从新跑上述语句。
通过这个优化,这个只有map任务的mapreduce会引入reduce过程,这样动态分区的那个字段比如日期在传到reducer时会被排序。由于分区字段是排序的,因此每个reducer只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写parquet文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。
但reduce阶段一直卡在99%,判断是uiappid数据倾斜导致。验证数据倾斜:
然后你会发现跑得特别慢。开启map group优化(Map端部分聚合,相当于Combiner):
设置上述参数即可。若是其他情况的group优化,可参考hive.groupby.skewindata参数。
有数据倾斜的时候进行负载均衡,当hive.groupby.skewindata设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
分两步:
1、第一步:找出条数大于1亿的uiappid后,select时过滤调这些大的uiappid。通过这个优化过,reduce阶段单个key的数据都不超过1亿条,可以快速得到结果。
2、第二步:再次将uiappid条数大于1亿的数据插入表中。因为大于1亿条的uiappid比较少,可以为每个mapper遇到的分区创建一个文件写入器(file writer)。
parquet(2)读写
1、大多数情况下,我们会使用高级工具来处理parquet文件,比如hive spark impala,不过有时候我们也需要进行低级顺序访问
2、parquet具有一个可插入式的内存数据模型,其作用是要让parquet文件格式更好地与类型广泛的各种工具集成,在java中,这种集成体现在readSupport 和 WriteSupport上
3、parquet写入
3.1、创建(MessageType)schema
3.2、创建parquet message实例 Group group
3.3、创建Groupwritersupport
3.4、创建parquetWriter
3.5、调用parquetwriter的write方法,最后closewriter
4、parquet读文件,更简单,不需要设置文件属性
4.1、创建groupreadsupport
4.2、创建parquetreader
4.3、调用read方法
5、大多数程序更倾向于使用avro、protocol buffers 或者thrift这样的框架来定义数据模型,parquet则迎合了这些需求
6、如avroparquetwriter protoparquetwriter thriftparquetwriter以及其分别对应的writer
上回话周朝东迁洛阳,此回话春秋时代
周王朝所属的每一个封国,都有自己完整的本国史,但是只有鲁国的留传下来,鲁国史称为“春秋”,所以史学家把公元前722年到公元前481年称为春秋时代。
卫国首先于719年政变,政变失败,接下来鲁国政变,宋国政变
宋国国君子与夷跟他的国防总司令孔父嘉是好朋友,孔父嘉的妻子十分美丽,大臣华督见了,立刻神魂颠倒,但是她的身份高贵,华督不能直接抢夺。那时,子与夷堂弟子冯流亡郑国,华督派人跟他联络,恰好孔父嘉积极训练军队准备出猎,华督散步谣言,“孔父嘉每次都被郑国打败”,煽动士兵,士兵祈求华督伸手援救,华督就率领他们攻杀孔父嘉,顺便把子与夷也杀了,子冯到了国君位置,华督得到了孔父嘉的妻子。
因为妻子过于漂亮引来杀身之祸的,孔父嘉是历史上第一人,但是因为美女而引起政权转移,王朝瓦解国家覆灭的却在以后经常出现。
如何将java对象转成parquet文件
把文本文件 直接转 parquet
可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量
压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间
只读取需要的列,支持向量运算,能够获取更好的扫描性能
Parquet就是基于Google的Dremel系统的数据模型和算法实现的。核心思想是使用“record shredding and assembly algorithm”来表示复杂的嵌套数据类型,同时辅以按列的高效压缩和编码技术,实现降低存
与Avro之前新统计系统的日志都是用Avro做序列化和存储,鉴于Parquet的优势和对Avro的兼容,将HDFS上的存储格式改为Paruqet,并且只需做很小的改动就用原读取Avro的API读取Parquet,以提高近一个数量级。
Parquet文件尾部存储了文件的元数据信息和统计信息,自描述的,方便解析
javaparquet的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于、javaparquet的信息别忘了在本站进行查找喔。
发布于:2022-12-15,除非注明,否则均为
原创文章,转载请注明出处。