表格存储:使用TableStoreWriter进行高并发、高吞吐的数据写入

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
日志服务 SLS,月写入数据量 50GB 1个月
简介: 表格存储(原OTS)的一大特性是能够支撑海量数据的高并发、高吞吐率的写入,特别适合日志数据或物联网场景(例如轨迹追踪或溯源)数据的写入和存储。这些场景的特性是,会在短时间内产生大量的数据需要消化并写入数据库,需要数据库能够提供高并发、高吞吐率的写入性能,需要满足每秒上万行甚至上百万行的写入吞吐率。针

概述

   表格存储(原OTS)的一大特性是能够支撑海量数据的高并发、高吞吐率的写入,特别适合日志数据或物联网场景(例如轨迹追踪或溯源)数据的写入和存储。这些场景的特性是,会在短时间内产生大量的数据需要消化并写入数据库,需要数据库能够提供高并发、高吞吐率的写入性能,需要满足每秒上万行甚至上百万行的写入吞吐率。针对这些场景,我们在存储层做了很多的优化(本篇文章不赘述),同时在SDK接口层也做了一些优化,专门提供了一个简单易用、高性能的数据导入接口。

   TableStoreWriter是基于Java SDK的异步接口,封装的一层专门用于高并发、高吞吐率数据导入的接口。本篇文章主要会介绍TableStoreWriter的适用场景、底层架构以及如何使用。

适用场景

特性

如果你的应用场景,满足以下特点,则可以考虑使用TableStoreWriter来作为数据写入的入口:


特点一: 高并发,对吞吐率要求很高

需要高并发的数据写入,非写入行的吞吐率要求很高。例如日志场景,需要分布式的采集日志,采集点可能很多;需要在短时间内将这些产生的日志消费掉,导入到数据库中,衡量导入性能的指标是每秒消费多少MB的日志数据。


特点二:对单条数据的写入延迟没有要求

应用场景需要的是高写入吞吐率,而不是单条数据的写入延迟。还是拿日志场景举例,日志场景对写入的要求是每秒能处理多少条日志,而不在乎一条日志从产生到最终写入的延迟。这是典型的离线和在线场景的区别,在线场景要求反馈是及时的。从延迟的量级上来讲,在线场景可能要求数据写入在毫秒级别,而离线场景可能可以接受数据写入延迟在百毫秒级别。

为啥TableStoreWriter要求应用对单行导入的延迟没有要求?这与TableStoreWriter内部优化写入吞吐率相关,为了最大化利用存储层写入的性能,TableStoreWriter内部会做数据缓冲,尽量发送大的数据包,而数据缓冲需要数据从写入到发送有一个暂缓。


特点三:写入可异步化(可采用生产者消费者模型)

TableStoreWriter为提高写入吞吐率,做的一个优化即异步化。异步化有很多的好处,包括数据写入可以更聚集,可以提供更高的写入并发等。

所以对于应用层,需要能够接受写入异步化。异步化代表的意思是,数据写入的触发线程,不需要同步的等待该行数据是否写入成功还是失败的反馈,数据写入失败或成功的处理可以被异步的执行。

类似的架构为:生产者将数据写入一个队列,而不用管该数据何时被消费,消费者异步的消费数据。


特点四:同一条数据可重复写入

TableStoreWriter无法避免一条数据可能被重复的写入,重复的原因有很多,例如网络超时重传等。在非事务的写入模式下,都很难保证一条数据不被重复写入,而如果带了事务的写入,则性能都不会好。TableStoreWriter重性能,所以需要应用能接受一条数据被重复的写入。

典型场景

日志存储

日志有其非常典型的特点:

  • 海量:日志的产出代价是比较小的,随着应用规模的增大,日志数据体量会非常大。
  • 要求高吞吐率:对单条日志从产出到写入的延迟没有要求,而重视的是消费短时间内产生的大量日志数据的吞吐率。
  • 处理可异步化:日志是业务性比较低的数据,一般不在业务的主线上,通常是离线处理,所以可异步化。
  • 可重复写入:日志是固化的数据,重复写入也不会影响数据的正确性。

消息系统

消息系统例如即时通讯,特点同样是:

  • 海量(例如写入放大的群消息等)
  • 要求高吞吐率:对单条消息的延迟不需要很高,可接受百毫秒级别的延迟,但是更注重的是短时间内产生的海量数据的写入(投递)速度。
  • 处理可异步化:消息的处理是完全可以被异步化的
  • 可重复写入:消息通常都会标注唯一的消息ID,且消息产生后不会更改,所以重复写入不会带来什么问题。

分布式队列消费

分布式队列的应用场景非常广,被广泛用在复杂的分布式系统中。它在提供高性能的消息传递之外,对架构的好处在于能够解耦模块之间的依赖,简化系统的架构。

若您的应用架构中,也用到了分布式队列,并且数据的消费者之一是将数据导入到表格存储数据库中,那也可以考虑使用TableStoreWriter。TableStoreWriter自身也是一个生产者消费者模型,与分布式队列的适用场景有相似之处。


架构解析

层次关系


图1 OTSWriter与SDK的层次关系

    如图1所示,TableStoreWriter是基于SDK层接口之上重新包装的一层接口,它与TableStore Java SDK的关系是:

  • 依赖了SDK提供的AsyncClient异步接口
  • 导入数据会使用BatchWriteRow接口
  • 单行异常重试依赖SDK提供的RetryStrategy

内部架构


图2 OTSWriter内部架构  

      如图2所示,为TableStoreWriter的内部架构。  

      如果直接使用TableStore Java SDK的接口,可以一样的完成数据导入的需求,但是TableStoreWriter在接口易用性和性能上做了一些优化,包括:  

  • 使用异步而非同步接口:旨在为了使用更少的线程但提供更高的并发。
  • 自动数据聚合:在内存中使用缓冲队列,让一次发给表格存储的批量写请求尽量大,提供写入吞吐率。
  • 采用生产者消费者模式: 比较传统的,更易于异步化和数据聚集的一种架构。
  • 使用高性能的数据交换队列:选用Disruptor RingBuffer,经过性能测试,采用多生产者单消费者的模型。
  • 屏蔽复杂的BatchWriteRow请求封装:通过SDK预检查,自动过滤脏数据(主键格式与表预定义的不符、行大小超限、行列数超限等),避免到了服务端后再抛错返回;自动处理请求限制(例如一次批量的行数限制、一次批量的大小限制等);
  • 行级别callback:SDK提供请求级别的callback,TableStoreWriter提供行级别的callback,让业务逻辑专注于处理行数据,完全屏蔽底层的请求单元。
  • 行级别重试:请求级别重试失败,会根据特定的错误码,转换为行级别的重试,最大程度保证行的写入成功率。

如何使用

配置


ClientConfiguration cc = new ClientConfiguration();
cc.setRetryStrategy(new DefaultRetryStrategy()); // 可定制重试策略,若需要保证数据写入成功率,可采用更激进的重试策略
AsyncClient asyncClient = new AsyncClient(endPoint, accessId, accessKey, instanceName, cc);

// 初始化
WriterConfig config = new WriterConfig();
config.setMaxBatchSize(4 * 1024 * 1024); // 配置一次批量导入请求的大小限制,默认是4MB
config.setMaxColumnsCount(128); // 配置一行的列数的上限,默认128列
config.setBufferSize(1024); // 配置内存中最多缓冲的数据行数,默认1024行,必须是2的指数倍
config.setMaxBatchRowsCount(100); // 配置一次批量导入的行数上限,默认100
config.setConcurrency(10); // 配置最大并发数,默认10
config.setMaxAttrColumnSize(2 * 1024 * 1024); // 配置属性列的值大小上限,默认是2MB
config.setMaxPKColumnSize(1024); // 配置主键列的值大小上限,默认1KB
config.setFlushInterval(10000); // 配置缓冲区flush的时间间隔,默认10s

// 配置一个callback,OTSWriter通过该callback反馈哪些导入成功,哪些行导入失败,该callback只简单的统计写入成功和失败的行数。

AtomicLong succeedCount = new AtomicLong();
AtomicLong failedCount = new AtomicLong();
TableStoreCallback callback = new SampleCallback(succeedCount, failedCount);
ExecutorService executor = Executors.newFixedThreadPool(2);
TableStoreWriter tablestoreWriter = new DefaultTableStoreWriter(asyncClient, tableName, config, callback, executor);



初始化一个TableStoreWriter需要以下几个配置参数:

  1. AsyncClient:一个提供异步调用的TableStore client,注意由于重试策略是依赖于SDK自身的重试策略,所以若需要定制批量写数据的重试策略,需要在这个Client中配置,如示例所示。
  2. WriterConfig:OTSWriter的相关配置,主要包括:限制项(一次批量写的行数上限、一次请求的大小限制等)、并发数(异步并发写入的并发数上限)等。
  3. TableStoreCallback:处理行级别成功或失败的callback。
  4. ExecutorService:用于处理callback调用的executor thread pool。

接口


int start = id * rowsCount;
for (int i = 0; i < rowsCount; i++) {
    PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("gid", PrimaryKeyValue.fromLong(start + i))
        .addPrimaryKeyColumn("uid", PrimaryKeyValue.fromLong(start + i)).build();

    RowPutChange rowChange = new RowPutChange(tableName);
    rowChange.setPrimaryKey(primaryKey);
    rowChange.addColumn("col1", ColumnValue.fromBoolean(true));
    rowChange.addColumn("col2", ColumnValue.fromLong(10));
    rowChange.addColumn("col3", ColumnValue.fromString("Hello world."));

    tablestoreWriter.addRowChange(rowChange);
}



往TableStoreWriter内写数据非常的简单,根据相应的请求(RowPutChange、RowUpdateChange或RowDeleteChange)构造不同的RowChange,直接往TableStoreWriter内扔即可。

Callback

private static class SampleCallback implements TableStoreCallback {
    private AtomicLong succeedCount;
    private AtomicLong failedCount;

    public SampleCallback(AtomicLong succeedCount, AtomicLong failedCount) {
        this.succeedCount = succeedCount;
        this.failedCount = failedCount;
    }

    @Override
    public void onCompleted(RowChange req, ConsumedCapacity res) {
succeedCount.incrementAndGet();    }    @Override    public void onFailed(RowChange req, Exception ex) {
ex.printStackTrace();        failedCount.incrementAndGet();    } }


TableStoreWriter通过callback来反馈行级别的成功或者失败,若成功,即调用onComplete函数,若失败,根据异常的类别,调用对应的onFailed函数。


相关文章

1. 表格存储Java SDK性能优化经验


联系我们


相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
120 3
|
8月前
|
缓存 安全 API
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的API网关设计实现
公司对外开放的OpenAPI-Server服务,作为核心内部系统与外部系统之间的重要通讯枢纽,每天处理数百万次的API调用、亿级别的消息推送以及TB/PB级别的数据同步。经过多年流量的持续增长,该服务体系依然稳固可靠,展现出强大的负载能力。
186 9
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的API网关设计实现
|
16小时前
|
NoSQL 关系型数据库 MySQL
招行面试:高并发写,为什么不推荐关系数据?
资深架构师尼恩针对高并发场景下为何不推荐使用关系数据库进行数据写入进行了深入剖析。文章详细解释了关系数据库(如MySQL)在高并发写入时的性能瓶颈,包括存储机制和事务特性带来的开销,并对比了NoSQL数据库的优势。通过具体案例和理论分析,尼恩为读者提供了系统化的解答,帮助面试者更好地应对类似问题,提升技术实力。此外,尼恩还分享了多个高并发系统的解决方案及优化技巧,助力开发者在面试中脱颖而出。 文章链接:[原文链接](https://mp.weixin.qq.com/s/PKsa-7eZqXDg3tpgJKCAAw) 更多技术资料和面试宝典可关注【技术自由圈】获取。
|
3月前
|
缓存 负载均衡 API
抖音抖店API请求获取宝贝详情数据、原价、销量、主图等参数可支持高并发调用接入演示
这是一个使用Python编写的示例代码,用于从抖音抖店API获取商品详情,包括原价、销量和主图等信息。示例展示了如何构建请求、处理响应及提取所需数据。针对高并发场景,建议采用缓存、限流、负载均衡、异步处理及代码优化等策略,以提升性能和稳定性。
|
7月前
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
546 0
|
6月前
|
DataWorks NoSQL 关系型数据库
DataWorks产品使用合集之如何从Tablestore同步数据到MySQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
安全 Java 数据库
在Java中实现高并发的数据访问控制
在Java中实现高并发的数据访问控制
|
8月前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之可以使用高并发大内存的方式读取存量数据吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
115 3
|
8月前
|
存储 消息中间件 Java
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的高可靠消息服务设计实现
在深入研究了 **“【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的API网关设计实现”** 设计实现后,我们意识到,尽管API网关为服务商提供了高效的数据获取手段,但实时数据的获取仍然是一个亟待解决的问题。
111 1
【亿级数据专题】「高并发架构」盘点本年度探索对外服务的百万请求量的高可靠消息服务设计实现
|
8月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。