Structured Streaming中DS和DF的操作详解

简介: 笔记

创建流式的DataSet和DataFrame

方式一:通过JavaBean方式

Java语言实现:

package com.kfk.spark.structuredstreaming;
import java.sql.Date;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/21
 * @time : 10:38 下午
 */
public class DeviceData {
    // device: string, type: string, signal: double, time: DateType
    private String device;
    private String deviceType;
    private double signal;
    private Date deviceTime;
    public String getDevice() {
        return device;
    }
    public void setDevice(String device) {
        this.device = device;
    }
    public String getDeviceType() {
        return deviceType;
    }
    public void setDeviceType(String deviceType) {
        this.deviceType = deviceType;
    }
    public double getSignal() {
        return signal;
    }
    public void setSignal(double signal) {
        this.signal = signal;
    }
    public Date getDeviceTime() {
        return deviceTime;
    }
    public void setDeviceTime(Date deviceTime) {
        this.deviceTime = deviceTime;
    }
    public DeviceData(String device, String deviceType, double signal, Date deviceTime) {
        this.device = device;
        this.deviceType = deviceType;
        this.signal = signal;
        this.deviceTime = deviceTime;
    }
    public DeviceData(){
    }
}
package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.sql.Date;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/21
 * @time : 10:35 下午
 */
public class StruStreamingDFOper {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<String> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as(Encoders.STRING());
        // 根据javaBean转换为dataset
        Dataset<DeviceData> dfdevice = dflines.map(new MapFunction<String, DeviceData>() {
            @Override
            public DeviceData call(String value) throws Exception {
                String[] lines = value.split(",");
                return new DeviceData(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3]));
            }
        }, ExpressionEncoder.javaBean(DeviceData.class));
        // result table
        Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count();
        // output
        StreamingQuery query = dffinal.writeStream()
                .outputMode("update")
                .format("console")
                .start();
        query.awaitTermination();
    }
}

方式二:构造Schema的方式

package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/21
 * @time : 10:35 下午
 */
public class StruStreamingDFOper2 {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<String> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as(Encoders.STRING());
        List<StructField> fields = new ArrayList<StructField>();
        StructField device = DataTypes.createStructField("device",DataTypes.StringType,true);
        StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true);
        StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true);
        StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true);
        fields.add(device);
        fields.add(deviceType);
        fields.add(signal);
        fields.add(deviceTime);
        // 构造Schema
        StructType scheme = DataTypes.createStructType(fields);
        // 根据schema转换为dataset
        Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() {
            @Override
            public Row call(String value) throws Exception {
                String[] lines = value.split(",");
                return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3]));
            }
        }, RowEncoder.apply(scheme));
        // result table
        Dataset<Row> dffinal = dfdevice.select("device","deviceType").where("signal > 10").groupBy("deviceType").count();
        // output
        StreamingQuery query = dffinal.writeStream()
                .outputMode("update")
                .format("console")
                .start();
        query.awaitTermination();
    }
}

Scala语言实现:

package com.kfk.spark.structuredstreaming
import com.kfk.spark.common.CommSparkSessionScala
import java.sql.Date
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/22
 * @time : 7:31 下午
 */
object StruStreamingDFOperScala {
    case  class  DeviceData(device : String,deviceType : String , signal : Double ,deviceTime: Date)
    def main(args: Array[String]): Unit = {
        val spark = CommSparkSessionScala.getSparkSession() ;
        import spark.implicits._;
        // input table
        val socketDF = spark
                .readStream
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as[String]
        // 构造Schema
        val userSchema = new StructType()
                .add("device", "string")
                .add("deviceType", "string")
                .add("signal", "double")
                .add("deviceTime", "date")
        // 根据schema转换为dataset
        val df_device = socketDF.map(x => {
            val lines = x.split(",")
            Row(lines(0),lines(1),lines(2),lines(3))
        })(RowEncoder(userSchema))
        // 根据case  class转换为dataset
        val df : Dataset[DeviceData] = df_device.as[DeviceData]
        // result table
        val df_final = df.groupBy("deviceType").count()
        // output
        val query = df_final.writeStream
                .outputMode("update")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

测试数据:

aa,ss,100,2020-12-21
bb,ss,47,2020-12-21
cc,ss,3,2020-12-21
dd,ss,46,2020-12-21

运行结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-----+
|deviceType|count|
+----------+-----+
|        ss|    3|
+----------+-----+

创建流式的DataSet和DataFrame也可以通过创建临时表来进行业务分析

package com.kfk.spark.structuredstreaming;
import com.kfk.spark.common.CommSparkSession;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/21
 * @time : 10:35 下午
 */
public class StruStreamingDFOper3 {
    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = CommSparkSession.getSparkSession();
        // input table
        Dataset<String> dflines = spark.readStream()
                .format("socket")
                .option("host", "bigdata-pro-m04")
                .option("port", 9999)
                .load().as(Encoders.STRING());
        List<StructField> fields = new ArrayList<StructField>();
        StructField device = DataTypes.createStructField("device",DataTypes.StringType,true);
        StructField deviceType = DataTypes.createStructField("deviceType",DataTypes.StringType,true);
        StructField signal = DataTypes.createStructField("signal",DataTypes.DoubleType,true);
        StructField deviceTime = DataTypes.createStructField("deviceTime",DataTypes.DateType,true);
        fields.add(device);
        fields.add(deviceType);
        fields.add(signal);
        fields.add(deviceTime);
        // 构造Schema
        StructType scheme = DataTypes.createStructType(fields);
        // 根据schema转换为dataset
        Dataset<Row> dfdevice = dflines.map(new MapFunction<String, Row>() {
            @Override
            public Row call(String value) throws Exception {
                String[] lines = value.split(",");
                return RowFactory.create(lines[0],lines[1],Double.parseDouble(lines[2]), Date.valueOf(lines[3]));
            }
        }, RowEncoder.apply(scheme));
        // 创建临时表
        dfdevice.createOrReplaceTempView("device");
        // result table
        Dataset<Row> dffinal = spark.sql("select * from device");
        // output
        StreamingQuery query = dffinal.writeStream()
                .outputMode("append")
                .format("console")
                .start();
        query.awaitTermination();
    }
}
相关文章
|
8月前
|
SQL 分布式计算 大数据
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
187 0
|
SQL 分布式计算 HIVE
pyspark笔记(RDD,DataFrame和Spark SQL)1
pyspark笔记(RDD,DataFrame和Spark SQL)
142 1
|
3月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
109 0
|
8月前
|
SQL 大数据 Apache
Flink Has Become the De-facto Standard of Streaming Compute
本文整理自 Apache Flink 中文社区发起人、阿里巴巴开源大数据平台负责人王峰(莫问),在 Flink Forward Asia 2023 主会场的分享。
425 0
Flink Has Become the De-facto Standard of Streaming Compute
|
SQL 存储 分布式计算
pyspark笔记(RDD,DataFrame和Spark SQL)2
pyspark笔记(RDD,DataFrame和Spark SQL)
100 2
|
SQL 分布式计算 Shell
198 Spark DataFrames创建
198 Spark DataFrames创建
77 0
|
SQL 分布式计算 Spark
Spark SQL实战(06)-RDD与DataFrame的互操作
包含特定对象类型的 RDD 的schema。 这种基于反射的方法可以使代码更简洁,在编写 Spark 应用程序时已知schema时效果很好
111 0
|
SQL 分布式计算 HIVE
spark sql编程之实现合并Parquet格式的DataFrame的schema
spark sql编程之实现合并Parquet格式的DataFrame的schema
361 0
spark sql编程之实现合并Parquet格式的DataFrame的schema
|
SQL JSON 分布式计算
Spark 操作 kudu--dataFrame , sparkSQL 操作 | 学习笔记
快速学习 Spark 操作 kudu--dataFrame , sparkSQL 操作
360 0
Spark 操作 kudu--dataFrame , sparkSQL 操作 | 学习笔记
|
存储 分布式计算 关系型数据库
KuduSpark_DF 读写 Kudu 表 | 学习笔记
快速学习 KuduSpark_DF 读写 Kudu 表
218 0
KuduSpark_DF 读写 Kudu 表 | 学习笔记