开发者学堂课程【大数据 Spark2020版(知识精讲与实战演练)第四阶段:工程搭建_读取数据】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/691/detail/12108
工程搭建_读取数据
创建工程后进行数据处理,数据处理要进行数据读取
数据读取
步骤:
1.创建文件
创建 Spark Application 主类 cn.itcast.taxi.TaxiAnalysisRunner
package cn.itcast.taxi
object TaxiAnalysisRunner {
def main (args : Array [String] ):Unit = {
}
}
2.数据读取
数据读取之前要做两件事
(1)初始化环境,导入必备的一些包
(2)在工程根目录中创建 dataset 文件夹,并拷贝数据集进去
代码如下
object TaxiAnalysisRunner {
def main (args : Array [String] ): Unit = {
//1.创建 SparkSession
val spark = SparkSession . builder()
.master ( "local[6]")
.appName( "taxi")
.getorCreate( )
//2.导入函数和隐式转换
import spark.implicits._
import org.apache.spark.sql.functions._
//3.读取文件
val taxiRaw = spark.read
.option ( "header" , value = true)
.csv ( "dataset/half_trip.csv")
taxiRaw.show ( )
taxiRaw. printSchema ( )}
}
}
每一步代码都是完整的代码,对于重复的代码会省略掉
3.运行结果如下
root
|--medallion: string (nullable = true)
|--hack_license: string (nullable = true)
|--vendor_id : string (nullable = true)
|--rate_code: string (nullable = true)
|--store_and_fwd_flag: string (nullable = true)
|--pickup_datetime: string (nullable = true) |--dropoff_datetime: string (nullable = true)
|--passenger_count: string (nullable = true)- |--trip_time_in_secs : string (nullable = true)
|--trip_distance: string (nullable = true)
|--pickup_longitude: string (nullable = true)
|--pickup_latitude: string (nullable = true)
|--dropoff_longitude: string (nullable = true)
|--dropoff_latitude: string (nullable = true)
4.下一步
(1)剪去多余列
现在数据集中包含了一些多余的列,在后续的计算中并不会使用到,如果让这些列参与计算的话,会影响整体性能,浪费集群资源
(2)类型转换
可以看到,现在的数据集中,所有列类型都是 String , 而在一些统计和运算中,不能使用 String 来进行,所以要将这些数据转为对应的类型
另外一个代码是 trip 类,前面暂时省略
Trip 是一个强类型的样例类,一个Trip对象代表一个出租车行程,使用 Trip 可以对应数据集中的一条记录
object TaxiAnalysisRunner {
def main (args : Array [String]) : Unit = {
//此处省略Main方法中内容
}
}
/**
*代表一个行程,是集合中的一条记录
*@param license出租车执照号
*@param pickUpTime上车时间
*@param dropoffTime下车时间
*@param pickUpX上车地点的经度
* @param pickUpY上车地点的纬度
*@param drop0ffX下车地点的经度
* @param drop0ffY下车地点的纬度
*/
case class Trip(
license: String,
pickUpTime: Long,
drop0ffTime: Long,
pickUpX: Double,
pickUpY: Double,
dropoffX: Double,
dropoffY: Double
)
看到整体代码结构
类改名字,改为 TaxiAnalysisRunner
创建目录,dataset,进入 spark 课程 files、Dataset、half_tripcsv,将 half_tripcsv 文件拷到 dataset 目录下
执行步骤
第一步创建 SparkSession,第二步导入隐式转换和函数们,第三步数据读取
package cn.itcast.taxi
import org.apache.spark.sql.SparkSession
class TaxiAnalysisRunner {
def main(args : Array [string]): unit = {
//1.创建 sparksession
val spark = sSparksession.builder()
.master( master ="local[ 6]")
.appName( name = "taxi")
.getorcreate()
//2.导入隐式转换和函数们
import spark.implicits._
import org.apache.spark.sql.functions._
(数据格式是 csv,taxiRaw .show() 与 taxiRaw.printschema()
查看数据构成与数据结构)
//3.数据读取
val taxiRaw = spark.read
.option( "header" , value = true)
.csv( path = "dataset/half_trip.csv" )
taxiRaw .show()
taxiRaw.printschema()
运行将 class 改为 object
object TaxiAnalysisRunner {
数据集已读取
数据比想象中的多,数据越多,读取数据的速度越慢,处理速度越慢,内存消耗越大
要删掉一部分内容,只保留需要的列
root
|--medallion: string (nullable = true)
|--hack_license: string (nullable = true)
|--vendor_id : string (nullable = true)
|--rate_code: string (nullable = true)
|--store_and_fwd_flag: string (nullable = true)
|--pickup_datetime: string (nullable = true) |--dropoff_datetime: string (nullable = true)
|--passenger_count: string (nullable = true)- |--trip_time_in_secs : string (nullable = true)
|--trip_distance: string (nullable = true)
|--pickup_longitude: string (nullable = true)
|--pickup_latitude: string (nullable = true)
|--dropoff_longitude: string (nullable = true)
|--dropoff_latitude: string (nullable = true)
pickup_datetime 与 dropoff_datetime 要进行数据转换,转成 long 型,时间有一种写法为格式化写法,为 string,也可以直接使用 data 对象表示时间,long 表示为时间戳。
pickup_longitude、pickup_latitude、dropoff_longitude、dropoff_latitude 应该是一个 Dubbo 型,转化成 Dubbo 型
要删减一部分内容并对数据类型进行转换

