「sparkjava」sparkjava是什么
今天给各位分享sparkjava的知识,其中也会对sparkjava是什么进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:
- 1、spark JavaPairRDD 怎么修改JavaPairRDD对象中的一个key或者value的值?
- 2、java oracle sql spark什么关系
- 3、如何将java代码在spark运行
- 4、spark不提供哪种语言的api
- 5、Spark 中用 Scala 和 java 开发有什么区别
- 6、java的怎么操作spark的dataframe
spark JavaPairRDD 怎么修改JavaPairRDD对象中的一个key或者value的值?
如果key或者value是基本数据类型,那么要用map类算子生成一个新的JavaPairRDD;如果key或者value是对象类型,那么出了上述方法,也可以使用foreach类算子直接修改key或value的值。
java oracle sql spark什么关系
java是一种编程语言,用来开发软件程序的。
oracle是一种数据库,用来进行数据的存储和管理的软件;
sql是一种数据库查询语言,用来对数据库中的数据进行各种操作的。
java可以通过sql语句对oracle中存储的数据进行操作。
spark不了解。
如何将java代码在spark运行
我们首先提出这样一个简单的需求:
现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:
1
121.205.198.92
- - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
2
121.205.198.92
- - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
3
121.205.198.92
- - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
4
121.205.198.92
- - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
5
121.205.241.229
- - [21/Feb/2014:00:00:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
6
121.205.241.229
- - [21/Feb/2014:00:00:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
Java实现Spark应用程序(Application)
我们实现的统计分析程序,有如下几个功能点:
从HDFS读取日志数据文件
将每行的第一个字段(IP地址)抽取出来
统计每个IP地址出现的次数
根据每个IP地址出现的次数进行一个降序排序
根据IP地址,调用GeoIP库获取IP所属国家
打印输出结果,每行的格式:[国家代码] IP地址 频率
下面,看我们使用Java实现的统计分析应用程序代码,如下所示:
001
package org.shirdrn.spark.job;
002
003
import java.io.File;
004
import java.io.IOException;
005
import java.util.Arrays;
006
import java.util.Collections;
007
import java.util.Comparator;
008
import java.util.List;
009
import java.util.regex.Pattern;
010
011
import org.apache.commons.logging.Log;
012
import org.apache.commons.logging.LogFactory;
013
import org.apache.spark.api.java.JavaPairRDD;
014
import org.apache.spark.api.java.JavaRDD;
015
import org.apache.spark.api.java.JavaSparkContext;
016
import org.apache.spark.api.java.function.FlatMapFunction;
017
import org.apache.spark.api.java.function.Function2;
018
import org.apache.spark.api.java.function.PairFunction;
019
import org.shirdrn.spark.job.maxmind.Country;
020
import org.shirdrn.spark.job.maxmind.LookupService;
021
022
import scala.Serializable;
023
import scala.Tuple2;
024
025
public class IPAddressStats implements Serializable
{
026
027
private static final long serialVersionUID
= 8533489548835413763L;
028
private static final Log
LOG = LogFactory.getLog(IPAddressStats.class);
029
private static final Pattern
SPACE = Pattern.compile("
");
030
private transient LookupService
lookupService;
031
private transient final String
geoIPFile;
032
033
public IPAddressStats(String
geoIPFile) {
034
this.geoIPFile
= geoIPFile;
035
try {
036
//
lookupService: get country code from a IP address
037
File
file = new File(this.geoIPFile);
038
LOG.info("GeoIP
file: " +
file.getAbsolutePath());
039
lookupService
= new AdvancedLookupService(file,
LookupService.GEOIP_MEMORY_CACHE);
040
} catch (IOException
e) {
041
throw new RuntimeException(e);
042
}
043
}
044
045
@SuppressWarnings("serial")
046
public void stat(String[]
args) {
047
JavaSparkContext
ctx = new JavaSparkContext(args[0], "IPAddressStats",
048
System.getenv("SPARK_HOME"),
JavaSparkContext.jarOfClass(IPAddressStats.class));
049
JavaRDDString
lines = ctx.textFile(args[1], 1);
050
051
//
splits and extracts ip address filed
052
JavaRDDString
words = lines.flatMap(new FlatMapFunctionString,
String() {
053
@Override
054
public IterableString
call(String s) {
055
//
121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101
Firefox/11.0"
056
//
ip address
057
return Arrays.asList(SPACE.split(s)[0]);
058
}
059
});
060
061
//
map
062
JavaPairRDDString,
Integer ones = words.map(new PairFunctionString,
String, Integer() {
063
@Override
064
public Tuple2String,
Integer call(String s) {
065
return new Tuple2String,
Integer(s, 1);
066
}
067
});
068
069
//
reduce
070
JavaPairRDDString,
Integer counts = ones.reduceByKey(new Function2Integer,
Integer, Integer() {
071
@Override
072
public Integer
call(Integer i1, Integer i2) {
073
return i1
+ i2;
074
}
075
});
076
077
ListTuple2String,
Integer output = counts.collect();
078
079
//
sort statistics result by value
080
Collections.sort(output, new ComparatorTuple2String,
Integer() {
081
@Override
082
public int compare(Tuple2String,
Integer t1, Tuple2String, Integer t2) {
083
if(t1._2
t2._2) {
084
return 1;
085
} else if(t1._2
t2._2) {
086
return -1;
087
}
088
return 0;
089
}
090
});
091
092
writeTo(args,
output);
093
094
}
095
096
private void writeTo(String[]
args, ListTuple2String, Integer output) {
097
for (Tuple2?,
? tuple : output) {
098
Country
country = lookupService.getCountry((String) tuple._1);
099
LOG.info("[" +
country.getCode() + "]
" +
tuple._1 + "\t" +
tuple._2);
100
}
101
}
102
103
public static void main(String[]
args) {
104
//
./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStatsspark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
105
if (args.length
3)
{
106
System.err.println("Usage:
IPAddressStats master inFile GeoIPFile");
107
System.err.println("
Example: org.shirdrn.spark.job.IPAddressStatsspark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");
108
System.exit(1);
109
}
110
111
String
geoIPFile = args[2];
112
IPAddressStats
stats = new IPAddressStats(geoIPFile);
113
stats.stat(args);
114
115
System.exit(0);
116
117
}
118
119
}
spark不提供哪种语言的api
LISP语言。
Scala是Spark的主要编程语言,但Spark还支持Java、Python、R作为编程语言。LISP是一种通用高级计算机程序语言,长期以来垄断人工智能领域的应用。LISP作为应用人工智能而设计的语言,是第一个声明式系内函数式程序设计语言,有别于命令式系内过程式的C、Fortran和面向对象的Java等结构化程序设计语言。
Spark 中用 Scala 和 java 开发有什么区别
Scala到底是什么?在目前众多的JVM语言当中,Scala无疑是最引人注意的语言之一。Scala是一个静态语言,更适合大型工程项目,Scala直接编译成Java字节码,性能接近Java。Scala是一个多范式的语言,你可以混合使用函数式和面向对象编程,混合使用可变类和不变类,混合使用Actor和传统的Java并发库。
短短一个月的时间,Scala于本月冲进了TIOBE的前五十名。一个 Twitter 的开发人员说过,Scala 将会成为现代 Web2.0 的发起语言。LinkedIn 也用这种语言。同样许多其他大的公司如 Sony Picture, EDF, SAP 也开始使用这种语言。为什么Scala发展这么迅猛,可以获得如此热烈的社区支持。
曾冠东还表示,Scala不是Java的杀手,它无法取代Java的地位,也突破不了JVM的限制、Java实现不了的功能它也实现不了。我们可以将Scala形象的理解成大量语法糖的Java。
Scala 开发团队发布了最新的2.9.2稳定版本,Scala 语言的特性有许多,例如高阶函数和对象、抽象类型绑定,actor 使得函数在 Scala 中能是一个子类成为可能,Scala 中的设计模式使得面向对象和函数编程无缝结合。Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。它已经成功运用在电信行业。Spark 是一种可扩展的数据分析平台,它整合了内存计算的基元,因此,相对于 Hadoop 的集群存储方法,它在性能方面更具优势。Spark 是在 Scala 语言中实现的,并且利用了该语言,为数据处理提供了独一无二的环境。Scala 编译器可以生成字节码,直接运行在使用JVM上。该语言(它实际上代表了可扩展语言)被定义为可直接集成到语言中的简单扩展。
Scala作为一门静态语言,它的主要特性有哪些?
· Scala是面向对象的
Scala是一个纯面向对象语言,在某种意义上来讲所有数值都是对象。对象的类型和行为是由class和trait来描述的。Class的抽象可由子类化和一种灵活的基于mixin的组合机制(它可作为多重继承的简单替代方案)来扩展。
· Scala是函数式的
Scala还是一个函数式语言,在某种意义上来讲所有函数都是数值。Scala为定义匿名函数提供了一种轻量级的语法,它支持高阶(higher-order)函数、允许函数嵌套、支持局部套用(currying)。Scala的case类及其内置支持的模式匹配模型代数类型在许多函数式编程语言中都被使用。
· Scala是静态类型的
Scala配备了一套富有表现力的类型系统,该抽象概念以一种安全的和一致的方式被使用。
· Scala是可扩展的
Scala的设计承认了实践事实,领域特定应用开发通常需要领域特定语言扩展。Scala提供了一个独特的语言组合机制,这可以更加容易地以类库的形式增加新的语言结构:
任何方式可以被用作中缀(infix)或后缀(postfix)操作符闭包按照所期望的类型(目标类型)自动地被构造
两者结合使用可方便地定义新语句,无需扩展语法,也无需使用类似宏的元编程工具。
· Scala可与Java和.NET进行互操作
Scala设计时就考虑了与流行编程环境良好交互,如Java 2运行时环境(JRE)和 .NET框架(CLR)。特别是与主流面向对象语言,如Java和C#尽量无缝交互。Scala有像Java和C#一样的编译模型(独立编译,动态装载类),允许访问成千上万的高质量类库。
在并发性方面,与 Scala 在 .NET 领域中的姐妹语言 F# 相似,Scala 是针对 “并发性问题” 的解决方案之一,让开发人员能够更加轻松地专注于问题的实质,而不用考虑并发编程的低级细节。Actor 编程模式让高度并行应用程序的开发更加简单。Scala把Erlang风格的基于actor的并发带进了JVM。我们可以利用Scala的actor模型在JVM上设计具伸缩性的并发应用程序,以自动获得多核心处理器带来的优势,而不必依照复杂的Java线程模型来编写程序。Scala 为并发性提供了两种级别的支持,这与其他与 Java 相关的主题极为类似:
首先,对底层库的完全访问(比如说 java.util.concurrent)以及对 “传统” Java 并发性语义的支持(比如说监控程序和wait()/notifyAll())。其次,这些基本机制上面有一个抽象层
Scala 提供了在稳定的高性能平台(Java 虚拟机)上生成的能力同时也是一门敏捷性语言。这一类型的语言也有其他的选择,例如 Jython, JRuby, Groovy 和 Clojure, 但是这些都是运行在 JVM 上的动态类型语言。Open Class 的效果让大家会觉得Scala是动态语言,但它是选择隐式转换来实现的,这也正好证明了Scala是静态语言。隐式转换(Implicit conversion)使 Scala 具有类型安全性,正如扩展方法(extension method)之于 C#,开放类(open class)之于 ruby。即:向未曾定义的类型添加方法(如字符串、列表、整数)。这是使得 Scala 符合 DSL(特定领域语言)模型的特性之一。
Scala结合了面向对象和函数编程的优势,函数编程的一个好处就是你能够像运用一个数据那样运用函数,可以用来定义真正高层级的库,或者去定义新的领域特殊语言(DSL)。
在谈及Java与Scala的对比时,曾冠东表示,Scala能调用绝大部分的Java,而Java调用Scala独有的东西会比较难。Java 拥有非常强的概念规范,因此任何一个 Java 程序之间具有非常多的相似之处,并且这样能够方便的进行程序员交替。但是 Scala 并没有这样的统一性,因为这是一门很有表现力的语言。现场曾冠东为我们演示了实际案例,如下图所示:
正所谓,金无足赤,人无完人。Scala对二进制不兼容,语法也越来越复杂,不能突破Bytecode的限制、编译速度有所缓慢。当它被广泛用于单元测试、开发工具、Socket开发、以及面对多核挑战的并发应用。总而言之,Scala是一种函数式面向对象语言,它融汇了许多前所未有的特性,而同时又运行于JVM之上。正如JRuby 创建者之一Charles Nutter 所宣称的那样Scala就是 Java 王位的合法继承人。随着开发者对Scala的兴趣日增,以及越来越多的工具支持,无疑Scala语言将成为广大软件工程师手上一件必不可少的工具。
java的怎么操作spark的dataframe
t java.util.Properties;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
public class Demo_Mysql3 {
private static Logger logger = Logger.getLogger(Demo_Mysql2.class);
public static void main(String[] args) {
关于sparkjava和sparkjava是什么的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
发布于:2022-11-27,除非注明,否则均为
原创文章,转载请注明出处。