一、简答题:
①生态系统组件
①Spark Core
提供了Spark最基础与最核心的功能。
②Spark SQL
Spark SQL用于分布式结构化数据的SQL查询与分析,在编写程序中,可以直接使用SQL语句。
③Spark Streaming
Spark Streaming是用于处理流数据的分布式流处理框架,它将数据流以时间片为单位进行分割形成RDD,能够以较小的时间间隔对流数据进行处理,从严格意义上说是一个准实时处理系统。
④Mllib
Mllib是一个分布式机器学习库,在Spark平台上对一些常用的机器学习算法进行了分布式实现,包括:分类、回归、聚类、决策树等。
⑤GraphX
GraphX是一个分布式图处理框架,在Spark上实现了大规模图计算的功能,提供了对图计算和图挖掘的各种接口。
②hadoop与Spark运行架构的区别
MapReduce | Spark | |
数据存储机构 | 磁盘HDFS文件系统 | 使用内存构建弹性分布式数据集。RDD对数据进行运算和缓存 |
编程范式 | Map+Reduce | DAG(有向无环图) |
中间结果存储 | 中间结果落地磁盘,IO及序列化反序列化 | 代价比较大 |
运行方式 | Task以进程方式维护,任务启动慢 | Task以程方式维护,任务启动快 |
二、程序题
围绕spraksql展开
//创建SQLContext
val sqlContext = new SQLContext(sc)
//从指定的地址创建RDD
val lineRDD = sc.textFile(args(0)).map(_.split(" "))
// 读入键值对形式
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)).toDF()
//求平均值
rdd,mapValues(x=>(x,1)).
reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).
mapValues(x=>(x._1/x._2)).
collect()
//排序
df.sort(df(“age”).asc).show()
//绘制流程图表示步骤后变化
三、改错题
二次排序过程及函数改错
class SecondSortKey(val first:Int,val second:Int ) extends Ordered[SecondSortKey] with Serializable {
override def compare(that: SecondSortKey): Int = {
//首先判断第一个列中的数是否相等
if(this.first-that.first != 0){
return this.first-that.first
}else{ //如果不相等,就执行else
return this.second-that.second
}
}
}
四、
隐式操作
导入sqlContext.implicits._来实现RDD到Dataframe的隐式转换。
读取json文件
写文件
textFile.saveAsTextFile(“目录”)
五、填空题
删除线为填空部分
//注册表
personDF.createOrReplaceTempView (“t_person”)
//将RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt , p(1).trim, p(2).toInt))
//通过StructType直接指定每个字段的schema
val schema = StructType(
List(
StructField(“id”, IntegerType , true),
StructField(“name”, StringType , true),
StructField(“age”, IntegerType, true)
)
)
//将schema信息应用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema )
六、Steaming
五个步骤
数据抽象是什么&特点是什么
七、特征抽取
①词语在语料库中
首先,导入TFIDF所需要的包:
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
接下来,根据SparkContext来创建一个SQLContext,其中sc是一个已经存在的SparkContext;然后导入sqlContext.implicits._来实现RDD到Dataframe的隐式转换。
scala> val sqlContext = new SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@225a9fc6 scala> import sqlContext.implicits._ import sqlContext.implicits._
第三步,创建一个集合,每一个句子代表一个文件。
scala> val sentenceData = sqlContext.createDataFrame(Seq( | (0, "I heard about Spark and I love Spark"), | (0, "I wish Java could use case classes"), | (1, "Logistic regression models are neat") | )).toDF("label", "sentence") sentenceData: org.apache.spark.sql.DataFrame = [label: int, sentence: string]
第四步,用tokenizer把每个句子分解成单词
scala> val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words") tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_494411a37f99 scala> val wordsData = tokenizer.transform(sentenceData) wordsData: org.apache.spark.sql.DataFrame = [label: int, sentence: string, words: array<string>] scala> wordsData.foreach {println} [1,Logistic regression models are neat,WrappedArray(logistic, regression, models, are, neat)] [0,I wish Java could use case classes,WrappedArray(i, wish, java, could, use, case, classes)] [0,I heard about Spark and I love Spark,WrappedArray(i, heard, about, spark, and, i, love, spark)]
从打印结果可以看到,tokenizer的transform()方法把每个句子拆分成了一个个单词。
第五步,用HashingTF的transform()方法把句子哈希成特征向量。我们这里设置哈希表的桶数为2000。
scala> val hashingTF = new HashingTF(). | setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(2000) hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_2591ec73cea0 scala> val featurizedData = hashingTF.transform(wordsData) featurizedData: org.apache.spark.sql.DataFrame = [label: int, sentence: string, words: array<string>, rawFeatures: vector] scala> featurizedData.foreach {println} [1,Logistic regression models are neat,WrappedArray(logistic, regression, models, are, neat),(2000,[65,618,852,992,1194],[1.0,1.0,1.0,1.0,1.0])] [0,I wish Java could use case classes,WrappedArray(i, wish, java, could, use, case, classes),(2000,[103,105,192,774,818,1265,1703],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])] [0,I heard about Spark and I love Spark,WrappedArray(i, heard, about, spark, and, i, love, spark),(2000,[105,365,727,1469,1858,1926],[2.0,2.0,1.0,1.0,1.0,1.0])] scala
我们可以看到每一个单词被哈希成了一个不同的索引值。以”I heard about Spark and I love Spark”为例,输出结果中2000代表哈希表的桶数,“[105,365,727,1469,1858,1926]”分别代表着“i, spark, heard, about, and, love”的哈希值,“[2.0,2.0,1.0,1.0,1.0,1.0]”为对应单词的出现次数。
第六步,调用IDF方法来重新构造特征向量的规模,生成的idf是一个Estimator,在特征向量上应用它的fit()方法,会产生一个IDFModel。
scala> val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") idf: org.apache.spark.ml.feature.IDF = idf_7fcc9063de6f scala> val idfModel = idf.fit(featurizedData) idfModel: org.apache.spark.ml.feature.IDFModel = idf_7fcc9063de6f
同时,调用IDFModel的transform方法,可以得到每一个单词对应的TF-IDF 度量值。
scala> val rescaledData = idfModel.transform(featurizedData) rescaledData: org.apache.spark.sql.DataFrame = [label: int, sentence: string, words: array<string>, rawFeatures: vector, features: vector] scala> rescaledData.select("features", "label").take(3).foreach(println) [(2000,[105,365,1329,1469,1926],[0.28768207245178085,0.6931471805599453,0.693147 1805599453,0.6931471805599453,0.6931471805599453]),0] [(2000,[103,105,192,774,818,1265,1703],[0.6931471805599453,0.28768207245178085,0 .6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.693 1471805599453]),0] [(2000,[65,618,852,992,1194],[0.6931471805599453,0.6931471805599453,0.6931471805 599453,0.6931471805599453,0.6931471805599453]),1]
“[105,365,727,1469,1858,1926]”分别代表着“i, spark, heard, about, and, love”的哈希值。105和365分别代表了“i”和”spark”,其TF-IDF值分别是0.2876820724517808和0.6931471805599453。这两个单词都在第一句中出现了两次,而”i”在第二句中还多出现了一次,从而导致”i”的TF-IDF 度量值较低。因此,与“i”相比,“spark”能更好的区分文档。
②文档相似度、四个参数
val word2Vec = new Word2Vec(). | setInputCol("text"). | setOutputCol("result"). | setVectorSize(3). | setMinCount(0)
③执行模型
八、逻辑回归
①划分数据集
// 70%的数据用于训练模型,30%用于测试
val Array(trainingData, testData) = labelDf.randomSplit(Array(0.7, 0.3), seed)
②设置三个参数
// 建立回归模型,用训练集数据开始训练
val logisticRegression = new LogisticRegression() .setMaxIter(100) .setRegParam(0.02) .setElasticNetParam(0.8)
③模型训练
lrModel = lr.fit(training)