SparkR

简介: 1. sparkR的简介 SparkR是一个R语言包,它提供了轻量级的方式使得可以在R语言中使用Apache Spark。

1. sparkR的简介

SparkR是一个R语言包,它提供了轻量级的方式使得可以在R语言中使用Apache Spark。在Spark 1.4中,SparkR实现了分布式的data frame,支持类似查询、过滤以及聚合的操作(类似于R中的data frames:dplyr),但是这个可以操作大规模的数据集。

2. 使用spark的两种方式

1.在sparkR的shell中交互式使用

sparkR
2.在R脚本中使用

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sc <- sparkR.init(master = "spark://10.137",sparkEnvir = list(spark.driver.memory="3g"))

3. 纯R语言和SparkR

当数据量很大时,纯R速度就比较慢,无法用到大数据分布式性能,SparkR可以。

注意比较python,pyspark,SparkR等

4. SparkR DataFrame的基本使用

DataFrame是数据组织成一个带有列名称的分布式数据集。在概念上和关系型数据库中的表类似,或者和R语言中的data frame类似,但是这个提供了很多的优化措施。构造DataFrame的方式有很多:可以通过结构化文件中构造;可以通过Hive中的表构造;可以通过外部数据库构造或者是通过现有R的data.frame构造等等。

1.从SparkContext和SQLContext开始

SparkContext是SparkR的切入点,它使得你的R程序和Spark集群互通。你可以通过sparkR.init来构建SparkContext,然后可以传入类似于应用程序名称的选项给它。如果想使用DataFrames,我们得创建SQLContext,这个可以通过SparkContext来构造。如果你使用SparkR shell, SQLContext 和SparkContext会自动地构建好。

sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

2.创建DataFrame

如果有SQLContext实例,那么应用程序就可以通过本地的R data frame(或者是Hive表;或者是其他数据源)来创建DataFrames。下面将详细地介绍。

(1)通过本地data.frame构造

最简单地创建DataFrames是将Rdata frame转换成SparkR DataFrames,我们可以通过createDataFrame来创建,并传入本地Rdata.frame以此来创建SparkR DataFrames,下面例子就是这种方法:

user=data.frame(name=c('zhangsan','lisi','wangwu','zhaoliu'),age=c(21,23,20,27))
df <- createDataFrame(sqlContext, user)
(2)通过Data Sources构造

通过DataFrame接口,SparkR支持操作多种数据源,本节将介绍如何通过Data Sources提供的方法来加载和保存数据。你可以阅读Spark SQL编程指南来了解更多的options选项.
Data Sources中创建DataFrames的一般方法是使用read.df,这个方法需要传入SQLContext,需要加载的文件路径以及数据源的类型。SparkR内置支持读取JSON和Parquet文件,而且通过Spark Packages你可以读取很多类型的数据,比如CSV和Avro文件。
下面是介绍如何JSON文件,注意,这里使用的文件不是典型的JSON文件。每行文件必须包含一个分隔符、自包含有效的JSON对象:

people <- read.df(sqlContext, "/wmf/people.json", "json")
head(people)

# SparkR 能自动从Json文件推断schema
printSchema(people)

Data sources API还可以将DataFrames保存成多种的文件格式,比如我们可以通过write.df将上面的DataFrame保存成Parquet文件:

write.df(people, path="people.parquet", source="parquet", mode="overwrite")
(3)通过Hive tables构造

我们也可以通过Hive表来创建SparkR DataFrames,为了达到这个目的,我们需要创建HiveContext,因为我们可以通过它来访问Hive MetaStore中的表。注意,Spark内置就对Hive提供了支持。

hiveContext <- sparkRHive.init(sc)
sql="能在bdcmagic上运行的sql语句"
results<-sql(hiveContext, sql)
head(results)

3.DataFrame的相关操作

(1)选择行和列
#创建一个数据框
user=data.frame(name=c('zhangsan','lisi','wangwu','zhaoliu'),age=c(21,23,20,27))
df <- createDataFrame(sqlContext, user)
#获得数据框的一个基本信息
df
#选择某一列
head(select(df,df$name)) #或者直接使用数据框的列名来选择head(select(df,name))
(2)Grouping和Aggregation
#n操作符其实就是count的意思
head(summarize(groupBy(df, df$sex), count = n(df$sex)))

#数据框的排序
sex_counts=summarize(groupBy(df, df$sex), count = n(df$sex))
head(arrange(sex_counts, desc(sex_counts$count)))
(3)列上面的操作
SparkR提供了大量的函数用于直接对列进行数据处理的操作。
#为数据框增加一列
df$second_age=df$age+10
head(df)
(4)在数据框上使用SQL查询
#创建一个数据框
...

#将数据框注册成表
registerTempTable(df, "people")

#运行sql语句
sql(hiveContext,"sql语句,eg:select * from people")

#过滤,选择满足条件的行
head(filter(df, df$age < 23))
目录
相关文章
|
机器学习/深度学习 人工智能
CF788A Functions again
CF788A Functions again
66 0
|
分布式计算 Java Spark
Optimizing Spark job parameters
Optimizing Spark job parameters
260 0
|
SQL 分布式计算
[SparkR]
words
899 0
[Papers]NSE, $u$, Lorentz space [Bosia-Pata-Robinson, JMFM, 2014]
$$\bex \bbu\in L^p(0,T;L^{q,\infty}),\quad \frac{2}{p}+\frac{3}{q}=1,\quad 3
787 0
[Papers]NSE, $u$, Lorentz space [Bjorland-Vasseur, JMFM, 2011]
$$\bex \int_0^T\frac{\sen{\bbu}_{L^{q,\infty}}^p}{\ve+\ln \sex{e+\sen{\bbu}_{L^\infty}}}\rd s
641 0
[Papers]NSE, $u$, Lorentz space [Sohr, JEE, 2001]
$$\bex \bbu\in L^{p,r}(0,T;L^{q,\infty}(\bbR^3)),\quad\frac{2}{p}+\frac{3}{q}=1,\quad 3
1041 0
[Papers]NSE, $u_3$, Lebesgue space [NNP, QM, 2002; Zhou, JMPA, 2005]
$$\bex u_3\in L^p(0,T;L^q(\bbR^3)),\quad \frac{2}{p}+\frac{3}{q}=\frac{1}{2},\quad 6< q\leq \infty. \eex$$
735 0
|
Python
[Papers]NSE, $\pi$, Lorentz space [Suzuki, JMFM, 2012]
$$\bex \sen{\pi}_{L^{s,\infty}(0,T;L^{q,\infty}(\bbR^3))} \leq \ve_*, \eex$$ with $$\bex \frac{2}{s}+\frac{3}{q}=2,\quad \frac{5}{2}\leq q\leq 3.
651 0
|
Python
[Papers]NSE, $\pi$, Lorentz space [Suzuki, NA, 2012]
$$\bex \sen{\pi}_{L^{s,\infty}(0,T;L^{q,\infty}(\bbR^3))} \leq \ve_*, \eex$$ with $$\bex \frac{2}{s}+\frac{3}{q}=2,\quad 3< q
575 0
[Papers]NSE, $\p_3u$, multiplier spaces [Guo-Gala, ANAP, 2013]
$$\bex \p_3\bbu\in L^\frac{2}{1-r}(0,T;\dot X_r(\bbR^3)),\quad 0\leq r\leq 1. \eex$$
688 0