「javaspa」JavaSpark判断RDD是否为空
今天给各位分享javaspa的知识,其中也会对JavaSpark判断RDD是否为空进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:
- 1、在Eclipse中跑一个Spark的算Pi例子。报错如下:
- 2、如何配置Eclipse来使用Java编写Spark App
- 3、Spark SQL怎么创建编程创建DataFrame
- 4、win10 sparkcontext初始化出现语法错误
- 5、如何通过Spark的Shell操作SparkContext实例过程
- 6、java 怎么查询hive创建spark rdd
在Eclipse中跑一个Spark的算Pi例子。报错如下:
你的spark集群的主机的hosts列表中有master对应的ip吗,看看/etc/hosts中有没有一行,如果spark的主机也就是master是本地机器,就用.setMaster("spark://localhost:7077"):
好了;
建议还是用Idea来编spark程序
如何配置Eclipse来使用Java编写Spark App
1. 环境准备
Eclipse 请不要使用最新的 Neon(4.6) ,太多Bug了。 还是使用最新的 Mars(4.5) 系列吧
JDK 版本8.x (Linux推荐Oracle, 没有测试过OpenJDK)
因为只是用Java,因此无需安装Scala及其相应的插件
2. 创建一个新的maven项目
3. pom.xml
dependency
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-client/artifactId
version2.6.0/version
exclusions
exclusion
groupIdjavax.servlet/groupId
artifactId*/artifactId
/exclusion
/exclusions
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.6.1/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.6.1/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-mllib_2.10/artifactId
version1.6.1/version
/dependency
dependency
groupIdorg.scala-lang/groupId
artifactIdscala-library/artifactId
version2.10.5/version
/dependency
这里只列出了必要的几个依赖。 其他的请根据你自己的项目需求添加
4. 使用maven打包
之后在Goal 之中填写clean package 即可
如果在修改了与Spark相关的代码之后,需要重新编译打包才行。 否则会有异常提示。
5. 代码编写注意事项:
在创建JavaSparkContext的时候,需要把自己加进去。
public static JavaSparkContextgetContext(String taskName) {
JavaSparkContextsc = new JavaSparkContext(SparkConnUtils.getSparkConf(taskName));
sc.addJar("target/sparkstat-0.0.1-SNAPSHOT.jar");
return sc;
}
其中target/sparkstat-0.0.1-SNAPSHOT.jar是maven 运行之后生成的完整的jar包
Spark SQL怎么创建编程创建DataFrame
创建 SQLContext
Spark SQL 中所有相关功能的入口点是 SQLContext 类或者它的子类, 创建一个 SQLContext 的所有需要仅仅是一个 SparkContext。
使用 Scala 创建方式如下:
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
使用 Java 创建方式如下:
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
使用 Python 创建方式如下:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
除了一个基本的 SQLContext,你也能够创建一个 HiveContext,它支持基本 SQLContext 所支持功能的一个超集。它的额外的功能包括用更完整的 HiveQL 分析器写查询去访问 HiveUDFs 的能力、 从 Hive 表读取数据的能力。用 HiveContext 你不需要一个已经存在的 Hive 开启,SQLContext 可用的数据源对 HiveContext 也可用。HiveContext 分开打包是为了避免在 Spark 构建时包含了所有 的 Hive 依赖。如果对你的应用程序来说,这些依赖不存在问题,Spark 1.3 推荐使用 HiveContext。以后的稳定版本将专注于为 SQLContext 提供与 HiveContext 等价的功能。
用来解析查询语句的特定 SQL 变种语言可以通过 spark.sql.dialect 选项来选择。这个参数可以通过两种方式改变,一种方式是通过 setConf 方法设定,另一种方式是在 SQL 命令中通过 SET key=value 来设定。对于 SQLContext,唯一可用的方言是 “sql”,它是 Spark SQL 提供的一个简单的 SQL 解析器。在 HiveContext 中,虽然也支持”sql”,但默认的方言是 “hiveql”,这是因为 HiveQL 解析器更完整。
win10 sparkcontext初始化出现语法错误
SparkContext是程序执行的入口,一个SparkContext代表一个应用,深入理解spark运行时机制,首先要了解SparkContext初始化过程。
SparkContext初始化
SparkContext的定义
构造参数为SparkConf,其存储spark相关的配置信息,查看SparkConf定义
SparkConf内部用ConcurrentHashMap存储各种配置信息,初始化时会加载所有以spark.开头的环境变量。
SparkContext初始化
查看初始化对应代码
clone SparkConf变量,接着判断spark.master和spark.app.name是否存在,如果是YARN cluster模式则必须设置spark.yarn.app.id,然后是driver的host,port信息,最后是jars和files,接着查看
_eventLogDir是否记录运行时信息,由spark.eventLog.enabled和spark.eventLog.dir控制,以及是否压缩该信息_eventLogCodec,spark.externalBlockStore.folderName设置运行时本地存储的目录名,为“spark-” + randomUUID.toString(),如果为yarn-client模式,设置SPARK_YARN_MODE=true,使用JobProgressListener跟踪运行时信息,用于UI展示,最后创建SparkEnv对象,创建SparkEnv的过程涉及到非常多spark-core中的核心类。
查看createSparkEnv
继续查看SparkEnv.createDriverEnv
获取host和port信息,调用create方法。
继续查看create方法
创建安全相关的SecurityManager,通过spark.authenticate配置
创建基于akka的分布式消息系统,中间的创建过程略多,这里不再描述,最后看看SparkEnv初始化了哪些对象
上图中列出的对象几乎涵盖了spark-core的核心类,后续会对其分别分析,至此,SparkEnv创建完毕。
接着上面SparkEnv.set(_env)(限于篇幅,关于UI方面的代码可能会被略掉),_metadataCleaner使用TimerTask定期清理persistentRdd,读取hadoop配置,将jar和file的路径添加到rpcEnv的fileServer,读取Executor相关变量,重要的参数为ExecutorMemory
接着,_heartbeatReceiver是默认基于netty实现的心跳机制,创建schedulerBackend用于提交任务,创建taskScheduler和dagScheduler,获取applicationId,启动度量系统,获取eventLogger
executorAllocationManager关于Executor动态资源分配,通过spark.dynamicAllocation.enabled设置,创建contextcleaner用于清理过期的RDD, shuffle和broadcast ,启动ListenerBus,并post环境信息和应用信息,最后添加确保context停止的hook,至此整个sparkcontext的初始化流程结束
---------------------
作者:Mr_JieLQ
来源:CSDN
原文:
版权声明:本文为博主原创文章,转载请附上博文链接!
如何通过Spark的Shell操作SparkContext实例过程
Spark的交互式脚本是一种学习API的简单途径,也是分析数据集交互的有力工具。
Spark抽象的分布式集群空间叫做Resilient Distributed Dataset (RDD)弹性数据集。
其中,RDD有两种创建方式:
(1)、从Hadoop的文件系统输入(例如HDFS);
(2)、有其他已存在的RDD转换得到新的RDD;
下面进行简单的测试:
1. 进入SPARK_HOME/bin下运行命令:
[java] view plain copy print?
$./spark-shell
2. 利用HDFS上的一个文本文件创建一个新RDD:
[java] view plain copy print?
scala var textFile = sc.textFile("hdfs://localhost:50040/input/WordCount/text1");
[java] view plain copy print?
textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12
3. RDD有两种类型的操作 ,分别是Action(返回values)和Transformations(返回一个新的RDD)
(1)Action相当于执行一个动作,会返回一个结果:
java 怎么查询hive创建spark rdd
查询hive,返回结果
将返回结果放到spark rdd
例如:
JavaSparkContext sc = new JavaSparkContext(conf);
ListInteger data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDDInteger distData = sc.parallelize(data);
其中data可以视为从hive查询得到
关于javaspa和JavaSpark判断RDD是否为空的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
发布于:2022-12-17,除非注明,否则均为
原创文章,转载请注明出处。