「javaspa」JavaSpark判断RDD是否为空

博主:adminadmin 2022-12-17 08:21:11 61

今天给各位分享javaspa的知识,其中也会对JavaSpark判断RDD是否为空进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

在Eclipse中跑一个Spark的算Pi例子。报错如下:

你的spark集群的主机的hosts列表中有master对应的ip吗,看看/etc/hosts中有没有一行,如果spark的主机也就是master是本地机器,就用.setMaster("spark://localhost:7077"):

好了;

建议还是用Idea来编spark程序

如何配置Eclipse来使用Java编写Spark App

1. 环境准备

Eclipse 请不要使用最新的 Neon(4.6) ,太多Bug了。 还是使用最新的 Mars(4.5) 系列吧

JDK 版本8.x (Linux推荐Oracle, 没有测试过OpenJDK)

因为只是用Java,因此无需安装Scala及其相应的插件

2. 创建一个新的maven项目

3. pom.xml

dependency

groupIdorg.apache.hadoop/groupId

artifactIdhadoop-client/artifactId

version2.6.0/version

exclusions

exclusion

groupIdjavax.servlet/groupId

artifactId*/artifactId

/exclusion

/exclusions

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-core_2.10/artifactId

version1.6.1/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-streaming_2.10/artifactId

version1.6.1/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-mllib_2.10/artifactId

version1.6.1/version

/dependency

dependency

groupIdorg.scala-lang/groupId

artifactIdscala-library/artifactId

version2.10.5/version

/dependency

这里只列出了必要的几个依赖。 其他的请根据你自己的项目需求添加

4. 使用maven打包

之后在Goal 之中填写clean package 即可

如果在修改了与Spark相关的代码之后,需要重新编译打包才行。 否则会有异常提示。

5. 代码编写注意事项:

在创建JavaSparkContext的时候,需要把自己加进去。

public static JavaSparkContextgetContext(String taskName) {

JavaSparkContextsc = new JavaSparkContext(SparkConnUtils.getSparkConf(taskName));

sc.addJar("target/sparkstat-0.0.1-SNAPSHOT.jar");

return sc;

}

其中target/sparkstat-0.0.1-SNAPSHOT.jar是maven 运行之后生成的完整的jar包

Spark SQL怎么创建编程创建DataFrame

创建 SQLContext

Spark SQL 中所有相关功能的入口点是 SQLContext 类或者它的子类, 创建一个 SQLContext 的所有需要仅仅是一个 SparkContext。

使用 Scala 创建方式如下:

val sc: SparkContext // An existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

使用 Java 创建方式如下:

JavaSparkContext sc = ...; // An existing JavaSparkContext.

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

使用 Python 创建方式如下:

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

除了一个基本的 SQLContext,你也能够创建一个 HiveContext,它支持基本 SQLContext 所支持功能的一个超集。它的额外的功能包括用更完整的 HiveQL 分析器写查询去访问 HiveUDFs 的能力、 从 Hive 表读取数据的能力。用 HiveContext 你不需要一个已经存在的 Hive 开启,SQLContext 可用的数据源对 HiveContext 也可用。HiveContext 分开打包是为了避免在 Spark 构建时包含了所有 的 Hive 依赖。如果对你的应用程序来说,这些依赖不存在问题,Spark 1.3 推荐使用 HiveContext。以后的稳定版本将专注于为 SQLContext 提供与 HiveContext 等价的功能。

用来解析查询语句的特定 SQL 变种语言可以通过 spark.sql.dialect 选项来选择。这个参数可以通过两种方式改变,一种方式是通过 setConf 方法设定,另一种方式是在 SQL 命令中通过 SET key=value 来设定。对于 SQLContext,唯一可用的方言是 “sql”,它是 Spark SQL 提供的一个简单的 SQL 解析器。在 HiveContext 中,虽然也支持”sql”,但默认的方言是 “hiveql”,这是因为 HiveQL 解析器更完整。

win10 sparkcontext初始化出现语法错误

SparkContext是程序执行的入口,一个SparkContext代表一个应用,深入理解spark运行时机制,首先要了解SparkContext初始化过程。

SparkContext初始化

SparkContext的定义

构造参数为SparkConf,其存储spark相关的配置信息,查看SparkConf定义

SparkConf内部用ConcurrentHashMap存储各种配置信息,初始化时会加载所有以spark.开头的环境变量。

SparkContext初始化

查看初始化对应代码

clone SparkConf变量,接着判断spark.master和spark.app.name是否存在,如果是YARN cluster模式则必须设置spark.yarn.app.id,然后是driver的host,port信息,最后是jars和files,接着查看

_eventLogDir是否记录运行时信息,由spark.eventLog.enabled和spark.eventLog.dir控制,以及是否压缩该信息_eventLogCodec,spark.externalBlockStore.folderName设置运行时本地存储的目录名,为“spark-” + randomUUID.toString(),如果为yarn-client模式,设置SPARK_YARN_MODE=true,使用JobProgressListener跟踪运行时信息,用于UI展示,最后创建SparkEnv对象,创建SparkEnv的过程涉及到非常多spark-core中的核心类。

查看createSparkEnv

继续查看SparkEnv.createDriverEnv

获取host和port信息,调用create方法。

继续查看create方法

创建安全相关的SecurityManager,通过spark.authenticate配置

创建基于akka的分布式消息系统,中间的创建过程略多,这里不再描述,最后看看SparkEnv初始化了哪些对象

上图中列出的对象几乎涵盖了spark-core的核心类,后续会对其分别分析,至此,SparkEnv创建完毕。

接着上面SparkEnv.set(_env)(限于篇幅,关于UI方面的代码可能会被略掉),_metadataCleaner使用TimerTask定期清理persistentRdd,读取hadoop配置,将jar和file的路径添加到rpcEnv的fileServer,读取Executor相关变量,重要的参数为ExecutorMemory

接着,_heartbeatReceiver是默认基于netty实现的心跳机制,创建schedulerBackend用于提交任务,创建taskScheduler和dagScheduler,获取applicationId,启动度量系统,获取eventLogger

executorAllocationManager关于Executor动态资源分配,通过spark.dynamicAllocation.enabled设置,创建contextcleaner用于清理过期的RDD, shuffle和broadcast ,启动ListenerBus,并post环境信息和应用信息,最后添加确保context停止的hook,至此整个sparkcontext的初始化流程结束

---------------------

作者:Mr_JieLQ

来源:CSDN

原文:

版权声明:本文为博主原创文章,转载请附上博文链接!

如何通过Spark的Shell操作SparkContext实例过程

Spark的交互式脚本是一种学习API的简单途径,也是分析数据集交互的有力工具。

Spark抽象的分布式集群空间叫做Resilient Distributed Dataset (RDD)弹性数据集。

其中,RDD有两种创建方式:

(1)、从Hadoop的文件系统输入(例如HDFS);

(2)、有其他已存在的RDD转换得到新的RDD;

下面进行简单的测试:

1. 进入SPARK_HOME/bin下运行命令:

[java] view plain copy print?

$./spark-shell

2. 利用HDFS上的一个文本文件创建一个新RDD:

[java] view plain copy print?

scala var textFile = sc.textFile("hdfs://localhost:50040/input/WordCount/text1");

[java] view plain copy print?

textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12

3. RDD有两种类型的操作 ,分别是Action(返回values)和Transformations(返回一个新的RDD)

(1)Action相当于执行一个动作,会返回一个结果:

java 怎么查询hive创建spark rdd

查询hive,返回结果

将返回结果放到spark rdd

例如:

JavaSparkContext sc = new JavaSparkContext(conf);    

ListInteger data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

JavaRDDInteger distData = sc.parallelize(data);

其中data可以视为从hive查询得到

关于javaspa和JavaSpark判断RDD是否为空的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

The End

发布于:2022-12-17,除非注明,否则均为首码项目网原创文章,转载请注明出处。