Apache IoTDB开发系统整合之TsFile-Spark-Connector

简介: TsFile-Spark-Connector 可以在 SparkSQL By SparkSQL 中将一个或多个 TsFiles 显示为表。它还允许用户指定单个目录或使用通配符来匹配多个目录。如果有多个 TsFiles,则所有 TsFiles 中测量值的并集将保留在表中,并且默认情况下,具有相同名称的度量将具有相同的数据类型。

TsFile-Spark-Connector用户指南

1. TsFile-Spark-Connector简介

TsFile-Spark-Connector 实现了 Spark 对 Tsfile 类型的外部数据源的支持。这使用户能够通过Spark读取,写入和查询Tsfile。

使用此连接器,您可以

  • 将单个 TsFile 从本地文件系统或 hdfs 加载到 Spark 中
  • 将特定目录中的所有文件从本地文件系统或HDFS加载到Spark中
  • 将数据从 Spark 写入 TsFile

2. 系统要求

Spark Version Scala Version Java Version TsFile
2.4.3 2.11.8 1.8 0.10.0

注意:有关如何下载和使用 TsFile 的更多信息,请参阅以下链接:https://github.com/apache/incubator-iotdb/tree/master/tsfile.

3. 快速入门

本地模式

在本地模式下使用 TsFile-Spark-Connector 启动 Spark:

  1. ./<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-0.10.0-jar-with-dependencies.jar

注意:

  • is the real path of your spark-shell.
  • Multiple jar packages are separated by commas without any spaces.

分布式模式

在分布式模式下使用 TsFile-Spark-Connector 启动 Spark(即 Spark 集群通过 spark-shell 连接):

  1. . /<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-{version}-jar-with-dependencies.jar --master spark://ip:7077

4. Data Type Correspondence

TsFile data type SparkSQL data type
BOOLEAN BooleanType
INT32 IntegerType
INT64 LongType
FLOAT FloatType
DOUBLE DoubleType
TEXT StringType


5. 模式推理

显示 TsFile 的方式取决于架构。以以下 TsFile 结构为例: TsFile 架构中有三个度量:状态、温度和硬件。这三项测量的基本信息如下:

Name Type Encode
status Boolean PLAIN
temperature Float RLE
hardware Text PLAIN

TsFile 中的现有数据如下:

time root.ln.wf02.wt02.temperature root.ln.wf02.wt02.status root.ln.wf02.wt02.hardware root.ln.wf01.wt01.temperature root.ln.wf01.wt01.status root.ln.wf01.wt01.hardware
1 null true null 2.2 true null
2 null false aaa 2.2 null null
3 null null null 2.1 true null
4 null true bbb null null null
5 null null null null false null
6 null null ccc null

您也可以使用窄表形式,如下所示:(您可以看到第 6 部分有关如何使用窄格式)

time device_name status hardware temperature
1 root.ln.wf02.wt01 true null 2.2
1 root.ln.wf02.wt02 true null null
2 root.ln.wf02.wt01 null null 2.2
2 root.ln.wf02.wt02 false aaa null
3 root.ln.wf02.wt01 true null 2.1
4 root.ln.wf02.wt02 true bbb null
5 root.ln.wf02.wt01 false null null
6 root.ln.wf02.wt02 null ccc null

6. Scala API

注意:请记住提前分配必要的读取和写入权限。

示例 1:从本地文件系统读取

  1. import org.apache.iotdb.tsfile._
  2. val wide_df = spark.read.tsfile("test.tsfile")
  3. wide_df.show
  4. val narrow_df = spark.read.tsfile("test.tsfile", true)
  5. narrow_df.show

示例 2:从 Hadoop文件系统读取

  1. import org.apache.iotdb.tsfile._
  2. val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. wide_df.show
  4. val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  5. narrow_df.show

示例 3:从特定目录读取

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/usr/hadoop")
  3. df.show

注 1:目前不支持目录中所有 TsFiles 的全局时间排序。

注 2:同名测量应具有相同的架构。

示例 4:宽格式查询

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select * from tsfile_table where `device_1.sensor_1`>0 and `device_1.sensor_2` < 22")
  5. newDf.show

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select count(*) from tsfile_table")
  5. newDf.show

示例 5:窄格式查询

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select * from tsfile_table where device_name = 'root.ln.wf02.wt02' and temperature > 5")
  5. newDf.show

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select count(*) from tsfile_table")
  5. newDf.show

示例 6:以宽格式写入

  1. // we only support wide_form table to write
  2. import org.apache.iotdb.tsfile._
  3. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  4. df.show
  5. df.write.tsfile("hdfs://localhost:9000/output")
  6. val newDf = spark.read.tsfile("hdfs://localhost:9000/output")
  7. newDf.show

示例 6:以窄格式书写

  1. // we only support wide_form table to write
  2. import org.apache.iotdb.tsfile._
  3. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  4. df.show
  5. df.write.tsfile("hdfs://localhost:9000/output", true)
  6. val newDf = spark.read.tsfile("hdfs://localhost:9000/output", true)
  7. newDf.show

附录 A:架构推理的旧设计

显示 TsFile 的方式与 TsFile 架构有关。以以下 TsFile 结构为例:TsFile 的架构中有三个度量:状态、温度和硬件。这三个测量的基本信息如下:

名字 类型 编码
地位 布尔 平原
温度 RLE
硬件 发短信 平原

测量基本信息

文件中的现有数据如下所示:

delta_object:root.ln.wf01.wt01 delta_object:root.ln.wf02.wt02 delta_object:root.sgcc.wf03.wt01
地位 温度 硬件 地位 地位 温度
时间 价值 时间 价值 时间 价值 时间 价值 时间 价值 时间 价值
1 1 2.2 2 “啊” 1 2 3 3.3
3 2 2.2 4 “咔嚓” 2 3 6 6.6
5 3 2.1 6 “抄送” 4 4 8 8.8
7 4 2.0 8 “爹” 5 6 9 9.9

一组时间序列数据

有两种方法可以显示它:

默认方式

将创建两列来存储设备的完整路径:time(LongType)和delta_object(StringType)。

  • time:时间戳,长类型
  • delta_object: Delta_object ID, 字符串类型

接下来,为每个度量创建一个列来存储特定数据。SparkSQL 表结构如下:

时间(长型) delta_object(字符串类型) 状态(布尔类型) 温度(浮子型) 硬件(字符串类型)
1 根.ln.wf01.wt01 2.2
1 根.ln.wf02.wt02
2 根.ln.wf01.wt01 2.2
2 根.ln.wf02.wt02 “啊”
2 根.sgcc.wf03.wt01
3 根.ln.wf01.wt01 2.1
3 根.sgcc.wf03.wt01 3.3
4 根.ln.wf01.wt01 2.0
4 根.ln.wf02.wt02 “咔嚓”
4 根.sgcc.wf03.wt01
5 根.ln.wf01.wt01
5 根.ln.wf02.wt02
5 根.sgcc.wf03.wt01
6 根.ln.wf02.wt02 “抄送”
6 根.sgcc.wf03.wt01 6.6
7 根.ln.wf01.wt01
8 根.ln.wf02.wt02 “爹”
8 根.sgcc.wf03.wt01 8.8
9 根.sgcc.wf03.wt01 9.9

展开delta_object列

将设备列按“.”展开为多个列,忽略根目录“root”。方便进行更丰富的聚合操作。如果用户想使用这种显示方式,需要在表创建语句中设置参数“delta_object_name”(参考本手册第5.5节中的示例1),如本例中参数“delta_object_name”设置为“root.device.turbine”。路径层数需要一对一。此时,将为设备路径的每个层(“根”层除外)创建一列。列名是参数中的名称,值是设备相应层的名称。接下来,将为每个测量创建一个列来存储特定数据。

那么 SparkSQL 表结构如下:

time(LongType) group(StringType) field(StringType) device(StringType) status(BooleanType) temperature(FloatType) hardware(StringType)
1 ln wf01 wt01 True 2.2 null
1 ln wf02 wt02 True null null
2 ln wf01 wt01 null 2.2 null
2 ln wf02 wt02 False null “aaa”
2 sgcc wf03 wt01 True null null
3 ln wf01 wt01 True 2.1 null
3 sgcc wf03 wt01 True 3.3 null
4 ln wf01 wt01 null 2.0 null
4 ln wf02 wt02 True null “bbb”
4 sgcc wf03 wt01 True null null
5 ln wf01 wt01 False null null
5 ln wf02 wt02 False null null
5 sgcc wf03 wt01 True null null
6 ln wf02 wt02 null null “ccc”
6 sgcc wf03 wt01 null 6.6 null
7 ln wf01 wt01 True null null
8 ln wf02 wt02 null null “ddd”
8 sgcc wf03 wt01 null 8.8 null
9 sgcc wf03 wt01 null 9.9

TsFile-Spark-Connector 可以在 SparkSQL By SparkSQL 中将一个或多个 TsFiles 显示为表。它还允许用户指定单个目录或使用通配符来匹配多个目录。如果有多个 TsFiles,则所有 TsFiles 中测量值的并集将保留在表中,并且默认情况下,具有相同名称的度量将具有相同的数据类型。请注意,如果存在名称相同但数据类型不同的情况,TsFile-Spark-Connector 将不保证结果的正确性。

写入过程是将数据帧写入为一个或多个 TsFiles。默认情况下,需要包含两列:时间和delta_object。其余列用作度量。

相关文章
|
23天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
87 2
|
29天前
|
物联网 数据管理 Apache
拥抱IoT浪潮,Apache IoTDB如何成为你的智能数据守护者?解锁物联网新纪元的数据管理秘籍!
【8月更文挑战第22天】随着物联网技术的发展,数据量激增对数据库提出新挑战。Apache IoTDB凭借其面向时间序列数据的设计,在IoT领域脱颖而出。相较于传统数据库,IoTDB采用树形数据模型高效管理实时数据,具备轻量级结构与高并发能力,并集成Hadoop/Spark支持复杂分析。在智能城市等场景下,IoTDB能处理如交通流量等数据,为决策提供支持。IoTDB还提供InfluxDB协议适配器简化迁移过程,并支持细致的权限管理确保数据安全。综上所述,IoTDB在IoT数据管理中展现出巨大潜力与竞争力。
45 1
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
29天前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
30天前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
当今社会,物联网技术的发展带来了许多繁琐的挑战,尤其是在数据库管理系统领域,比如实时整合海量数据、处理流中的事件以及处理数据的安全性。例如,应用于智能城市的基于物联网的交通传感器可以实时生成大量的交通数据。据估计,未来5年,物联网设备的数量将达数万亿。物联网产生大量的数据,包括流数据、时间序列数据、RFID数据、传感数据等。要有效地管理这些数据,就需要使用数据库。数据库在充分处理物联网数据方面扮演着非常重要的角色。因此,适当的数据库与适当的平台同等重要。由于物联网在世界上不同的环境中运行,选择合适的数据库变得非常重要。 原创文字,IoTDB 社区可进行使用与传播 一、什么是IoTDB 我
101 9
Apache IoTDB进行IoT相关开发实践
|
29天前
|
关系型数据库 MySQL 应用服务中间件
win7系统搭建PHP+Mysql+Apache环境+部署ecshop项目
这篇文章介绍了如何在Windows 7系统上搭建PHP、MySQL和Apache环境,并部署ECShop项目,包括安装配置步骤、解决常见问题以及使用XAMPP集成环境的替代方案。
37 1
win7系统搭建PHP+Mysql+Apache环境+部署ecshop项目
|
1月前
|
Java 持续交付 项目管理
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。
Maven是一款基于Apache许可的项目管理和构建自动化工具,在Java开发中极为流行。它采用项目对象模型(POM)来描述项目,简化构建流程。Maven提供依赖管理、标准构建生命周期、插件扩展等功能,支持多模块项目及版本控制。在Java Web开发中,Maven能够自动生成项目结构、管理依赖、自动化构建流程并运行多种插件任务,如代码质量检查和单元测试。遵循Maven的最佳实践,结合持续集成工具,可以显著提升开发效率和项目质量。
38 1
|
19天前
|
Apache 开发者 Java
Apache Wicket揭秘:如何巧妙利用模型与表单机制,实现Web应用高效开发?
【8月更文挑战第31天】本文深入探讨了Apache Wicket的模型与表单处理机制。Wicket作为一个组件化的Java Web框架,提供了多种模型实现,如CompoundPropertyModel等,充当组件与数据间的桥梁。文章通过示例介绍了模型创建及使用方法,并详细讲解了表单组件、提交处理及验证机制,帮助开发者更好地理解如何利用Wicket构建高效、易维护的Web应用程序。
14 0
|
23天前
|
存储 分布式计算 资源调度
Hadoop生态系统概览:从HDFS到Spark
【8月更文第28天】Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它由多个组件构成,旨在提供高可靠性、高可扩展性和成本效益的数据处理解决方案。本文将介绍Hadoop的核心组件,包括HDFS、MapReduce、YARN,并探讨它们如何与现代大数据处理工具如Spark集成。
48 0
|
1月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
36 0

推荐镜像

更多