创建流式的DataSet和DataFrame
方式一:通过JavaBean方式
Java语言实现:
package com.kfk.spark.structuredstreaming; import java.sql.Date; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/21 * @time : 10:38 下午 */ public class DeviceData { // device: string, type: string, signal: double, time: DateType private String device; private String deviceType; private double signal; private Date deviceTime; public String getDevice() { return device; } public void setDevice(String device) { this.device = device; } public String getDeviceType() { return deviceType; } public void setDeviceType(String deviceType) { this.deviceType = deviceType; } public double getSignal() { return signal; } public void setSignal(double signal) { this.signal = signal; } public Date getDeviceTime() { return deviceTime; } public void setDeviceTime(Date deviceTime) { this.deviceTime = deviceTime; } public DeviceData(String device, String deviceType, double signal, Date deviceTime) { this.device = device; this.deviceType = deviceType; this.signal = signal; this.deviceTime = deviceTime; } public DeviceData(){ } }
package com.kfk.spark.structuredstreaming; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import java.sql.Date; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/21 * @time : 10:35 下午 */ public class StruStreamingDFOper { public static void main(String[] args) throws StreamingQueryException { SparkSession spark = CommSparkSession.getSparkSession(); // input table Dataset<String> dflines = spark.readStream() .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load().as(Encoders.STRING()); // 根据javaBean转换为dataset Dataset<DeviceData> dfdevice = dflines.map(new MapFunction<String, DeviceData>() { @Override public DeviceData call(String value) throws Exception { String[] lines = value.split(","); return new DeviceData(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3])); } }, ExpressionEncoder.javaBean(DeviceData.class)); // result table Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count(); // output StreamingQuery query = dffinal.writeStream() .outputMode("update") .format("console") .start(); query.awaitTermination(); } }
方式二:构造Schema的方式
package com.kfk.spark.structuredstreaming; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.sql.Date; import java.util.ArrayList; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/21 * @time : 10:35 下午 */ public class StruStreamingDFOper2 { public static void main(String[] args) throws StreamingQueryException { SparkSession spark = CommSparkSession.getSparkSession(); // input table Dataset<String> dflines = spark.readStream() .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load().as(Encoders.STRING()); List<StructField> fields = new ArrayList<StructField>(); StructField device = DataTypes.createStructField("device",DataTypes.StringType,true); StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true); StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true); StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true); fields.add(device); fields.add(deviceType); fields.add(signal); fields.add(deviceTime); // 构造Schema StructType scheme = DataTypes.createStructType(fields); // 根据schema转换为dataset Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() { @Override public Row call(String value) throws Exception { String[] lines = value.split(","); return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3])); } }, RowEncoder.apply(scheme)); // result table Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count(); // output StreamingQuery query = dffinal.writeStream() .outputMode("update") .format("console") .start(); query.awaitTermination(); } }
Scala语言实现:
package com.kfk.spark.structuredstreaming import com.kfk.spark.common.CommSparkSessionScala import java.sql.Date import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.StructType; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/22 * @time : 7:31 下午 */ object StruStreamingDFOperScala { case class DeviceData(device : String,deviceType : String , signal : Double ,deviceTime: Date) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() ; import spark.implicits._; // input table val socketDF = spark .readStream .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load().as[String] // 构造Schema val userSchema = new StructType() .add("device", "string") .add("deviceType", "string") .add("signal", "double") .add("deviceTime", "date") // 根据schema转换为dataset val df_device = socketDF.map(x => { val lines = x.split(",") Row(lines(0),lines(1),lines(2),lines(3)) })(RowEncoder(userSchema)) // 根据case class转换为dataset val df : Dataset[DeviceData] = df_device.as[DeviceData] // result table val df_final = df.groupBy("deviceType").count() // output val query = df_final.writeStream .outputMode("update") .format("console") .start() query.awaitTermination() } }
测试数据:
aa,ss,100,2020-12-21 bb,ss,47,2020-12-21 cc,ss,3,2020-12-21 dd,ss,46,2020-12-21
运行结果:
------------------------------------------- Batch: 1 ------------------------------------------- +----------+-----+ |deviceType|count| +----------+-----+ | ss| 3| +----------+-----+
创建流式的DataSet和DataFrame也可以通过创建临时表来进行业务分析
package com.kfk.spark.structuredstreaming; import com.kfk.spark.common.CommSparkSession; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.sql.Date; import java.util.ArrayList; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/21 * @time : 10:35 下午 */ public class StruStreamingDFOper3 { public static void main(String[] args) throws StreamingQueryException { SparkSession spark = CommSparkSession.getSparkSession(); // input table Dataset<String> dflines = spark.readStream() .format("socket") .option("host", "bigdata-pro-m04") .option("port", 9999) .load().as(Encoders.STRING()); List<StructField> fields = new ArrayList<StructField>(); StructField device = DataTypes.createStructField("device",DataTypes.StringType,true); StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true); StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true); StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true); fields.add(device); fields.add(deviceType); fields.add(signal); fields.add(deviceTime); // 构造Schema StructType scheme = DataTypes.createStructType(fields); // 根据schema转换为dataset Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() { @Override public Row call(String value) throws Exception { String[] lines = value.split(","); return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3])); } }, RowEncoder.apply(scheme)); // 创建临时表 dfdevice.createOrReplaceTempView("device"); // result table Dataset<Row> dffinal = spark.sql("select * from device"); // output StreamingQuery query = dffinal.writeStream() .outputMode("append") .format("console") .start(); query.awaitTermination(); } }