TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页

本文涉及的产品
表格存储 Tablestore,50GB 3个月
简介: 1. 批量读取操作批量读取操作可以通过多种方式进行,包括:GetRow:根据主键读取一行数据。BatchGetRow:批量读取多行数据。GetRange:根据范围读取多行数据。

1. 批量读取操作

批量读取操作可以通过多种方式进行,包括:

  • GetRow:根据主键读取一行数据。
  • BatchGetRow:批量读取多行数据。
  • GetRange:根据范围读取多行数据。

在进行批量读取操作时,可以设置以下参数:


table_name:要读取的表名。

columns_to_get:要读取的列名,可以为空,表示读取所有列。

max_versions:每个单元格最多返回的版本数。

time_range:读取数据的时间范围。

start_primary_key:读取数据的起始主键。

end_primary_key:读取数据的结束主键。

direction:读取数据的顺序,可以是正序或倒序。

limit:读取数据的限制条数。

filter:读取数据的过滤条件。

过滤条件


过滤条件可以在批量读取操作中使用,用于过滤掉不符合条件的数据。过滤条件可以包括以下类型:


SingleColumnValueFilter:基于单个列的值进行过滤。

CompositeColumnValueFilter:基于多个列的值进行过滤。

ColumnPaginationFilter:基于列的分页过滤。

ColumnPrefixFilter:基于列名前缀的过滤。

MultipleColumnPrefixFilter:基于多个列名前缀的过滤。

ColumnRangeFilter:基于列的范围过滤。

RowFilter:基于行的过滤。

ValueFilter:基于值的过滤。

DependentColumnFilter:基于依赖列的过滤。

2. 分页

分页可以在批量读取操作中使用,用于分批读取数据。分页可以包括以下参数

limit:每页读取的数据条数。

offset:读取数据的偏移量。

表格存储的 SDK 提供了 BatchGetRow、BatchWriteRow、GetRange 和 createRangeIterator 等多行操作的接口,分页查询和倒序查询,过滤条件查询。

3. 代码示例

import com.alicloud.openservices.tablestore.*;
import com.alicloud.openservices.tablestore.model.*;
import java.util.ArrayList;
import java.util.List;
public class TableStoreExample {
    // 设置连接参数
    private static final String ENDPOINT = "your_endpoint";
    private static final String ACCESS_KEY_ID = "your_access_key_id";
    private static final String ACCESS_KEY_SECRET = "your_access_key_secret";
    private static final String INSTANCE_NAME = "your_instance_name";
    private static final String TABLE_NAME = "your_table_name";
    // 创建连接
    private static final SyncClient CLIENT = new SyncClient(ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET, INSTANCE_NAME);
    public static void main(String[] args) {
        try {
            // 创建表
            createTable();
            // 写入数据
            putData();
            // 批量读取数据
            batchGetData();
            // 删除数据
            deleteData();
            // 删除表
            deleteTable();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
    // 创建表
    private static void createTable() {
        TableMeta tableMeta = new TableMeta(TABLE_NAME);
        tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("id", PrimaryKeyType.INTEGER));
        tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("name", PrimaryKeyType.STRING));
        TableOptions tableOptions = new TableOptions(86400, 10);
        CLIENT.createTable(new CreateTableRequest(tableMeta, tableOptions));
    }
    // 写入数据
    private static void putData() {
        for (int i = 0; i < 10; i++) {
            PrimaryKey primaryKey = new PrimaryKey(new PrimaryKeyColumn("id", PrimaryKeyValue.fromLong(i)), new PrimaryKeyColumn("name", PrimaryKeyValue.fromString("name" + i)));
            RowPutChange rowPutChange = new RowPutChange(TABLE_NAME, primaryKey);
            rowPutChange.addColumn(new Column("value", ColumnValue.fromString("value" + i)));
            CLIENT.putRow(new PutRowRequest(rowPutChange));
        }
    }
    // 批量读取数据
    private static void batchGetData() {
        List<PrimaryKey> primaryKeys = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            primaryKeys.add(new PrimaryKey(new PrimaryKeyColumn("id", PrimaryKeyValue.fromLong(i)), new PrimaryKeyColumn("name", PrimaryKeyValue.fromString("name" + i))));
        }
        MultiRowQueryCriteria multiRowQueryCriteria = new MultiRowQueryCriteria(TABLE_NAME);
        multiRowQueryCriteria.setRowKeys(primaryKeys);
        BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
        batchGetRowRequest.addMultiRowQueryCriteria(multiRowQueryCriteria);
        BatchGetRowResponse batchGetRowResponse = CLIENT.batchGetRow(batchGetRowRequest);
        for (BatchGetRowResult result : batchGetRowResponse.getBatchGetRowResult()) {
            if (result.isSucceed()) {
                for (Row row : result.getRows()) {
                    System.out.println(row.getColumns().get(0).getValue());
                }
            }
        }
    }
    // 删除数据
    private static void deleteData() {
        for (int i = 0; i < 10; i++) {
            PrimaryKey primaryKey = new PrimaryKey(new PrimaryKeyColumn("id", PrimaryKeyValue.fromLong(i)), new PrimaryKeyColumn("name", PrimaryKeyValue.fromString("name" + i)));
            RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, primaryKey);
            CLIENT.deleteRow(new DeleteRowRequest(rowDeleteChange));
        }
    }
    // 删除表
    private static void deleteTable() {
        CLIENT.deleteTable(new DeleteTableRequest(TABLE_NAME));
    }
}

4. 批量读(BatchGetRow)

批量读接口可以一次请求读取多行数据,参数与 GetRow 接口参数一致。需要注意的是,批量读取的所有行采用相同的参数条件,比如 ColumnsToGet=[colA], 则要读取的所有行都只读取 colA 这一列。

与 BatchWriteRow 接口类似,使用 BatchGetRow 接口时也需要检查返回值。存在部分行失败,而不抛出异常的情况,此时失败行的信息在 BatchGetRowResponse 中。可以通过 BatchGetRowResponse#getFailedRows 方法获取失败的行的信息,通过 BatchGetRowResponse#isAllSucceed 方法可以判断是否所有行都获取成功。

示例

读取 10 行,设置版本条件、要读取的列、过滤器等。

private static void batchGetRow(SyncClient client) {
MultiRowQueryCriteria multiRowQueryCriteria = new MultiRowQueryCriteria(TABLE_NAME);
// 加入10个要读取的行
for (int i = 0; i < 10; i++) {
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk" + i));
PrimaryKey primaryKey = primaryKeyBuilder.build();
multiRowQueryCriteria.addRow(primaryKey);
}
// 添加条件
multiRowQueryCriteria.setMaxVersions(1);
multiRowQueryCriteria.addColumnsToGet("Col0");
multiRowQueryCriteria.addColumnsToGet("Col1");
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
singleColumnValueFilter.setPassIfMissing(false);
multiRowQueryCriteria.setFilter(singleColumnValueFilter);
BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
// batchGetRow支持读取多个表的数据, 一个multiRowQueryCriteria对应一个表的查询条件, 可以添加多个multiRowQueryCriteria.
batchGetRowRequest.addMultiRowQueryCriteria(multiRowQueryCriteria);
BatchGetRowResponse batchGetRowResponse = client.batchGetRow(batchGetRowRequest);
System.out.println("是否全部成功:" + batchGetRowResponse.isAllSucceed());
if (!batchGetRowResponse.isAllSucceed()) {
for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getFailedRows()) {
System.out.println("失败的行:" + batchGetRowRequest.getPrimaryKey(rowResult.getTableName(), rowResult.getIndex()));
System.out.println("失败原因:" + rowResult.getError());
}
/**
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试.这里只给出构造重试请求的部分.
* 推荐的重试方法是使用SDK的自定义重试策略功能, 支持对batch操作的部分行错误进行重试. 设定重试策略后, 调用接口处即不需要增加重试代码.
*/
BatchGetRowRequest retryRequest = batchGetRowRequest.createRequestForRetry(batchGetRowResponse.getFailedRows());
}
}

5. 批量写(BatchWriteRow)

BatchWriteRow 接口可以在一次请求中进行批量的写入操作,写入操作包括 PutRow、UpdateRow 和 DeleteRow,也支持一次对多张表进行写入。


构造单个操作的过程与使用 PutRow 接口、UpdateRow 接口和 DeleteRow 接口时相同,也支持设置更新条件。


调用 BatchWriteRow 接口时,需要特别注意的是检查返回值。因为是批量写入,可能存在部分行成功部分行失败的情况,此时失败行的 Index 及错误信息在返回的 BatchWriteRowResponse 中,而并不抛出异常。可通过 BatchWriteRowResponse 的 isAllSucceed 方法判断是否全部成功。若不检查,可能会忽略掉部分操作的失败。另一方面,BatchWriteRow 接口也是可能抛出异常的。比如,服务端检查到某些操作出现参数错误,可能会抛出参数错误的异常,在抛出异常的情况下该请求中所有的操作都未执行。

示例

一次 BatchWriteRow 请求,包含 2 个 PutRow 操作,1 个 UpdateRow 操作,1 个 DeleteRow 操作

private static void batchWriteRow(SyncClient client) {
BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
// 构造rowPutChange1
PrimaryKeyBuilder pk1Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk1Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk1"));
RowPutChange rowPutChange1 = new RowPutChange(TABLE_NAME, pk1Builder.build());
// 添加一些列
for (int i = 0; i < 10; i++) {
rowPutChange1.addColumn(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowPutChange1);
// 构造rowPutChange2
PrimaryKeyBuilder pk2Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk2Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk2"));
RowPutChange rowPutChange2 = new RowPutChange(TABLE_NAME, pk2Builder.build());
// 添加一些列
for (int i = 0; i < 10; i++) {
rowPutChange2.addColumn(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowPutChange2);
// 构造rowUpdateChange
PrimaryKeyBuilder pk3Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk3Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk3"));
RowUpdateChange rowUpdateChange = new RowUpdateChange(TABLE_NAME, pk3Builder.build());
// 添加一些列
for (int i = 0; i < 10; i++) {
rowUpdateChange.put(new Column("Col" + i, ColumnValue.fromLong(i)));
}
// 删除一列
rowUpdateChange.deleteColumns("Col10");
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowUpdateChange);
// 构造rowDeleteChange
PrimaryKeyBuilder pk4Builder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
pk4Builder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString("pk4"));
RowDeleteChange rowDeleteChange = new RowDeleteChange(TABLE_NAME, pk4Builder.build());
// 添加到batch操作中
batchWriteRowRequest.addRowChange(rowDeleteChange);
BatchWriteRowResponse response = client.batchWriteRow(batchWriteRowRequest);
System.out.println("是否全部成功:" + response.isAllSucceed());
if (!response.isAllSucceed()) {
for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) {
System.out.println("失败的行:" + batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());
System.out.println("失败原因:" + rowResult.getError());
}
/**
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试.这里只给出构造重试请求的部分.
* 推荐的重试方法是使用SDK的自定义重试策略功能, 支持对batch操作的部分行错误进行重试. 设定重试策略后, 调用接口处即不需要增加重试代码.
*/
BatchWriteRowRequest retryRequest = batchWriteRowRequest.createRequestForRetry(response.getFailedRows());
}
}

6. 范围读(GetRange)

范围读取接口用于读取一个范围内的数据。表格存储表中的行都是按照主键排序的,而主键是由全部主键列按照顺序组成的,所以不能理解为表格存储会按照某列主键排序,这是常见的误区。


GetRange 接口支持按照给定范围正序读取和反序读取,可以限定要读取的行数。如果范围较大,已经扫描的行数或者数据量超过一定限制,会停止继续扫描,返回已经获取的行和下一个主键的位置。用户可以根据返回的下一个主键位置,继续发起请求,获取范围内剩余的行。


GetRange 请求的主要参数如下:


Direction:枚举类型,包括 FORWARD、BACKWARD,分别代表正序、反序。


InclusiveStartPrimaryKey:范围的起始主键(包含)。若为反序,起始主键要大于结束主键。


ExclusiveEndPrimaryKey:范围的结束主键(不包含)。若为反序,起始主键要大于结束主键。


Limit:本次请求返回的最大行数。


ColumnsToGet:要读取的列的集合。若不设置,则读取所有列。


MaxVersions:最多读取多少个版本。MaxVersions 与 TimeRange 必须至少设置一个。


TimeRange: 要读取的版本号的范围。MaxVersions 与 TimeRange 必须至少设置一个。


Filter:过滤器。过滤器在服务端对读取的结果再进行一次过滤。

示例

正序读取,判断 NextStartPrimaryKey 是否为 null,读取完范围内的全部数据。

private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(TABLE_NAME);
// 设置起始主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(startPkValue));
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
// 设置结束主键
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(endPkValue));
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("GetRange的结果为:");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for (Row row : getRangeResponse.getRows()) {
System.out.println(row);
}
// 若nextStartPrimaryKey不为null, 则继续读取.
if (getRangeResponse.getNextStartPrimaryKey() != null) {
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
} else {
break;
}
}
}

7. 迭代读(createRangeIterator)

迭代读取数据。

示例

1private static void getRangeByIterator(SyncClient client, String startPkValue, String endPkValue) {
RangeIteratorParameter rangeIteratorParameter = new RangeIteratorParameter(TABLE_NAME);
// 设置起始主键
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(startPkValue));
rangeIteratorParameter.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
// 设置结束主键
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn(PRIMARY_KEY_NAME, PrimaryKeyValue.fromString(endPkValue));
rangeIteratorParameter.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeIteratorParameter.setMaxVersions(1);
Iterator<Row> iterator = client.createRangeIterator(rangeIteratorParameter);
System.out.println("使用Iterator进行GetRange的结果为:");
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println(row);
}
}


adfc268b493586dfcc6d9d8fd3eb1057.png

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
11月前
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
94 3
|
2月前
|
存储 SQL 监控
|
2月前
|
DataWorks NoSQL 关系型数据库
DataWorks产品使用合集之如何从Tablestore同步数据到MySQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
4月前
|
DataWorks NoSQL 关系型数据库
可以使用dataworks从tablestore同步数据到mysql吗?
可以使用dataworks从tablestore同步数据到mysql吗?
60 1
|
存储 NoSQL Java
OTS(Table Store)
OTS(Table Store)是阿里云提供的分布式NoSQL数据库服务,支持海量结构化数据的存储、查询和分析。OTS具有高可用、高性能、高扩展性和低成本等特点,适用于各种场景下的数据存储和处理,例如电商、物流、游戏等。
3866 2
|
SQL 存储 弹性计算
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1732 0
|
存储 消息中间件 NoSQL
物联网数据通过规则引擎流转到OTS|学习笔记
快速学习物联网数据通过规则引擎流转到OTS
304 0
物联网数据通过规则引擎流转到OTS|学习笔记
|
存储 负载均衡 开发者
表格存储数据多版本介绍| 学习笔记
快速学习表格存储数据多版本介绍。
269 0
表格存储数据多版本介绍| 学习笔记
|
存储 NoSQL 关系型数据库
基于TableStore的海量气象格点数据解决方案实战 王怀远
基于TableStore的海量气象格点数据解决方案实战 王怀远
403 0
基于TableStore的海量气象格点数据解决方案实战 王怀远