一、数据清洗
学习目标
1.如何使用 SparkSQL 读取 CSV 文件
2.如何使用正则表达式清洗掉多余字符串。
将出租车轨迹数据规整化,清洗掉多余的字符串,并使用 DataFrame.show() 打印输出。
清洗掉红框里面的 $ 、@ 字符,由于这两字符出现的次数没有规律,所以需要使用正则匹配。
清洗后内容如下:
import org.apache.spark.sql.SparkSession object Step1 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("Step1").master("local").getOrCreate() /**********begin**********/ val frame = spark.read.option("header", true).option("delimiter", "\t").csv("/root/data.csv") frame.createTempView("data") spark.udf.register("cleanData", (x: String) => { x.replaceAll("\\@+", "").replaceAll("\\$+", "") }) spark.sql( """ |select cleanData(TRIP_ID) as TRIP_ID,cleanData(CALL_TYPE) as CALL_TYPE,cleanData(ORIGIN_CALL) as ORIGIN_CALL, |cleanData(TAXI_ID) as TAXI_ID,cleanData(ORIGIN_STAND) as ORIGIN_STAND ,cleanData(TIMESTAMP) as TIMESTAMP, |cleanData(POLYLINE) as POLYLINE |from data """.stripMargin).show() /**********end**********/ spark.stop() } }
二、数据分析
使用SparkSQL完成数据分析
import com.alibaba.fastjson.JSON import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StringType object Step2 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("Step1").master("local").getOrCreate() spark.sparkContext.setLogLevel("error") /**********begin**********/ val frame = spark.read.option("header", true).option("delimiter", "\t").csv("/root/data2.csv") frame.createTempView("data") //1.将时间戳转换成时间 spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,POLYLINE, from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME from data").createTempView("data2") spark.sql("select * from data2").show() //2.将POLYLINE字段,分离出startLocation,endLocation 两个字段 spark.udf.register("startLocation", (x: String) => { val arr = JSON.parseArray(x) arr.get(0).toString }) spark.udf.register("endLocation", (x: String) => { val arr = JSON.parseArray(x) arr.get(arr.size() - 1).toString }) spark.sql( """ |select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,POLYLINE,TIME,startLocation(POLYLINE) as startLocation,endLocation(POLYLINE) as endLocation from data2 """.stripMargin).createTempView("data3") spark.sql("select * from data3").show() //3.计算时长,行程的总行程时间定义为(点数-1)×15秒。 // 例如,POLYLINE中具有101个数据点的行程具有(101-1)* 15 = 1500秒的长度 spark.udf.register("timeLen", (x: String) => { (JSON.parseArray(x).size() - 1) * 15 }) spark.sql( """ |select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,POLYLINE,TIME,startLocation(POLYLINE) as startLocation,endLocation(POLYLINE) as endLocation,timeLen(POLYLINE) as timeLen from data3 """.stripMargin).createTempView("data4") spark.sql("select * from data4").show() //4.统计每天各种呼叫类型的数量并以CALL_TYPE,TIME升序排序 spark.sql( """ |select CALL_TYPE ,TIME,count(1) as num from data4 group by TIME,CALL_TYPE order by CALL_TYPE,TIME """.stripMargin).show() /**********end**********/ spark.stop() } }
三、出租车轨迹图表展示
使用springboot + echarts 编写一个展示的图表程序:
对此你需要了解可视化分为前后端,也就是我们的MVC设计模式:
M层:
MainMapper
package net.educoder.app.mapper; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Select; import java.util.List; @Mapper public interface MainMapper { //参考 @Select("SELECT _num from taxi_trend WHERE _taxi = #{type} ORDER BY _time") List<Integer> findTaxiTrendNumByType(String type); /**********begin**********/ @Select("SELECT _time FROM taxi_trend GROUP BY _time ") List<String> findTaxiTrendTime(); @Select("select _taxi from taxi_trend group by _taxi") List<String> findTaxiType(); @Select("SELECT _type from taxi_servicenum GROUP BY _type") List<String> findTaxiPlatform(); @Select("SELECT _serviceType FROM taxi_servicenum GROUP BY _serviceType ORDER BY _serviceType") List<String> findAllTaxiService(); @Select("SELECT _num FROM taxi_servicenum WHERE _type = #{Platform} order BY _serviceType ") List<Integer> findServiceNumByPlatform(String Platform); /**********end**********/ }
V层:
index.html
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title></title> </head> <script src="echarts.min.js"></script> <script src="jquery-3.1.1.min.js"></script> <body> <div id="main" style="width: 1000px;height:600px;"></div> <div id="main2" style="width: 1000px;height:600px;"></div> </body> <script> var myChart = echarts.init(document.getElementById('main')); $.ajax({ /**********begin**********/ url: "/Line_Chart", /**********end**********/ success: function (data) { option = { title: { text: '各出租车平台年使用率' }, tooltip: { trigger: 'axis' }, legend: { data: ['A', 'B', 'C'] }, grid: { left: '3%', right: '4%', bottom: '3%', containLabel: true }, toolbox: { feature: { saveAsImage: {} } }, xAxis: { type: 'category', boundaryGap: false, /**********begin**********/ data:data.timeList /**********end**********/ }, yAxis: { type: 'value' }, /**********begin**********/ series:data.resultData /**********end**********/ }; myChart.setOption(option); }, dataType: "json", type: "post" }); var myChart2 = echarts.init(document.getElementById('main2')); $.ajax({ /**********begin**********/ url:"/Radar_Chart", /**********end**********/ success:function (data) { option = { title: { text: '各平台各服务数量' }, tooltip: {}, legend: { /**********begin**********/ data:data.taxiPlatform /**********end**********/ }, radar: { name: { textStyle: { color: '#fff', backgroundColor: '#999', borderRadius: 3, padding: [3, 5] } }, /**********begin**********/ indicator:data.indicator /**********end**********/ }, series: [{ type: 'radar', /**********begin**********/ data:data.resultData /**********end**********/ }] }; myChart2.setOption(option); }, dataType:"json", type:"post" }); </script> </html> • 1
C层:
MainController :
package net.educoder.app.controller; import net.educoder.app.entity.Chart_Line; import net.educoder.app.entity.Chart_Radar; import net.educoder.app.mapper.MainMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @Controller public class MainController { /**********begin**********/ @Autowired MainMapper mainMapper; @RequestMapping("/index") public String index() { return "index"; } @RequestMapping("/Line_Chart") @ResponseBody public Map<String, Object> Line_Chart() { List<String> taxiType = mainMapper.findTaxiType(); Map<String, Object> map = new HashMap<>(); List<Chart_Line> resultList = new ArrayList<>(); for (String s : taxiType) { List<Integer> list = mainMapper.findTaxiTrendNumByType(s); Chart_Line chart_line = new Chart_Line(s, "line", list); resultList.add(chart_line); } List<String> taxiTrendTimeList = mainMapper.findTaxiTrendTime(); map.put("timeList", taxiTrendTimeList); map.put("resultData", resultList); return map; } @RequestMapping("/Radar_Chart") @ResponseBody public Map<String, Object> Radar_Chart() { Map<String, Object> map = new HashMap<>(); List<String> allTaxiService = mainMapper.findAllTaxiService(); List<HashMap<String, Object>> indicatorList = new ArrayList<>(); for (String s : allTaxiService) { HashMap<String, Object> stringIntegerHashMap = new HashMap<>(); stringIntegerHashMap.put("name", s); stringIntegerHashMap.put("max", 100); indicatorList.add(stringIntegerHashMap); } List<String> taxiPlatform = mainMapper.findTaxiPlatform(); List<Chart_Radar> resultList = new ArrayList<>(); for (String s : taxiPlatform) { List<Integer> serviceNumByPlatform = mainMapper.findServiceNumByPlatform(s); Chart_Radar chart_radar = new Chart_Radar(s, serviceNumByPlatform); resultList.add(chart_radar); } map.put("resultData", resultList); map.put("legendData", taxiPlatform); map.put("indicator", indicatorList); return map; } /**********end**********/ }