TableStore: 海量结构化数据实时备份实战

本文涉及的产品
对象存储 OSS,20GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
阿里云盘企业版 CDE,企业版用户数5人 500GB空间
简介: # TableStore: 海量结构化数据实时备份实战 ## 数据备份简介 在信息技术与数据管理领域,备份是指将文件系统或数据库系统中的数据加以复制,一旦发生灾难或者错误操作时,得以方便而及时地恢复系统的有效数据和正常运作。

TableStore: 海量结构化数据实时备份实战

数据备份简介

在信息技术与数据管理领域,备份是指将文件系统或数据库系统中的数据加以复制,一旦发生灾难或者错误操作时,得以方便而及时地恢复系统的有效数据和正常运作。在实际备份过程中,最好将重要数据制作三个或三个以上的备份,并且放置在不同的场所异地备援,以供日后回存之用。

备份有两个不同的目的,其主要的目的是在数据丢失后恢复数据,无论数据是被删除还是被损坏。备份的第二个目的是根据用户定义的数据保留策略从较早的时间恢复数据,通常在备份应用程序中配置需要备份多长时间的数据副本。

由于备份系统至少会包含一个被认为值得保存的所有数据的副本,因此对数据存储的要求可能会很高,组织此存储空间和管理备份过程可能是一项复杂的任务。如今,有许多不同类型的数据存储设备可用于进行备份,还可以通过许多不同的方式来安排这些设备以提供地理冗余,数据安全性和可移植性。

在将数据发送到其存储位置之前,会选择,提取和操作它们,目前已经有许多不同的技术来优化备份过程,其中包括处理打开的文件(open files)和实时数据源的优化,以及压缩,加密和重复数据删除等。每个备份方案都应包括演习过程,以验证正在备份的数据的可靠性,更重要的是要认识到任何备份方案中涉及的限制和人为因素。

TableStore备份需求分析

对于存储系统而言,数据的安全可靠永远是第一位的,要保障数据尽可能高的可靠性,需要从两个方面保障:

  • 存储系统本身的数据可靠性:表格存储(TableStore)是阿里云自研的面向海量结构化数据存储的Serverless NoSQL多模型数据库,提供了99.9999999%的数据可靠性保证,在业界属于非常非常高的标准了。
  • 误操作后能恢复数据:误操作永远都无法避免,要做的是当误操作发生的时候能尽快恢复,那么就需要有备份数据存在。对于备份,有两种方案,一个是部署同城或异地灾备,这种代价高费用高,更多的用于社会基础信息或金融信息。另一种是将数据备份到另一个价格更低廉的系统,当万一出现误操作的时候,可以有办法恢复就行。一般可以选择文件存储系统,比如阿里云OSS。

TableStore备份恢复方案简介

下图为TableStore备份恢复的逻辑结构图,基于全增量一体的通道服务我们可以很容易的构建一整套的数据备份和数据恢复方案,同时具备了实时增量备份能力和秒级别的恢复能力。只要提前配置好备份和恢复的计划,整个备份恢复系统可以做到完全的自动化进行。
1551758693917-76231ab2-0f59-401e-b0c6-a4eab383d766.png

TableStore备份恢复方案实战

目前表格存储虽然未推出官方的备份和恢复功能,但是笔者会step-by-step的带大家基于表格存储的通道服务设计属于自己的专属备份恢复方案,实战步骤会分为备份和恢复两部分。

  1. 备份

    • 预准备阶段:需要确定待备份的数据源和备份的目的源,在此次的实战中,分别对应着TableStore的表和OSS的Bucket。
    • 确定备份计划和策略
    • 使用通道服务SDK编写代码
    • 执行备份策略并监测备份过程
  2. 恢复

    • 执行文件恢复

确定备份计划和策略

备份策略需要确定备份内容、备份时间和备份方式,目前常见的备份策略主要有以下几类。

  • 全量备份(Full Backup): 把硬盘或数据库内的所有文件、文件夹或数据进行一次性的复制。
  • 增量备份(Incremental Backup): 对上一次的全量备份或增量备份后更新的数据进行的备份。
  • 差异备份(Differential Backup): 提供运行全量备份后变更文件的备份。
  • 选择式备份:对系统数据的一部分进行的备份。
  • 冷备份:系统处于停机或维护状态下的备份。这种情况下,备份的数据与系统该时段的数据应该完全一致。
  • 热备份:系统处于正常运转状态下的备份。这种情况下,由于系统中的数据可能随时在更新,备份的数据相对于系统的真实数据可能会有一定的滞后。

在此次的实战中,笔者对于备份计划和策略的选择如下(这里可以根据自己的需求进行自定义设计,然后利用通道服务的SDK来完成相应的需求)。

  • 备份内容:TableStore 数据表
  • 备份时间:

    • 全量备份定期执行(时间可调,默认为一周)。
    • 增量备份根据配置定期执行,表格存储的通道服务可以保障数据的严格有序性,每个增量文件是流式append的,可随时被消费。
  • 备份方式:全量备份+增量备份,热备份。

使用通道服务SDK编写代码

这部分会结合代码片段的形式讲解,详细的代码后续会开源出去(链接会更新在本文中),尽请期待。在进行实战之前,推荐先阅读通道服务Java SDK的 使用文档

  1. 创建一个全量+增量类型的Tunnel,这里可以使用SDK或者官网控制台进行创建。  
private static void createTunnel(TunnelClient client, String tunnelName) {
    CreateTunnelRequest request = new CreateTunnelRequest(TableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = client.createTunnel(request);
    System.out.println("RequestId: " + resp.getRequestId());
    System.out.println("TunnelId: " + resp.getTunnelId());
}
  1. 了解通道服务自动化数据处理框架 

在通道服务的快速开始里,我们可以看到用户只需要传入处理数据的process函数和shutdown函数即可完成自动化的数据处理。在process函数的入参中会带有List,而StreamRecord包装的正是TableStore里每一行的数据,包括Record的类型,主键列,属性列,用户写入Record的时间戳等。

  1. 设计TableStore每一行数据的持久化格式 

本次实战中我们使用CSV文件格式,当然你也可以用pb或者其它格式,CSV的优点是可读性比较强,每一行数据会对应CSV文件的一行,持久化的格式如下图所示,CSV文件会有四列, TimeStamp列是数据写入TableStore的时间戳(全量时都为0,增量备份为具体的时间戳),RecordType是数据行的操作类型(PUT, UPDATE和DELETE),PrimaryKey为主键列的JSON字符串表示,RecordColumns为属性列的JSON字符串表示。
1551774132795-10bae3a8-0a54-413c-bb94-79810ff87774.png
转换部分的核心代码参见如下代码片段,笔者处理这部分用的是univocity-parsers(CSV)和Gson库,有几个地方需要特别注意下:1). Gson反序列化Long类型会转为Number类型,可能会造成进度丢失,有若干解决办法,笔者采用的是将其转为String类型序列化。2). 对于binary类型的数据的特殊处理,笔者进行了base64的编解码。3). 可以直接流式写入OSS中,减少本地持久化的消耗。

this.gson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter())
           .setLongSerializationPolicy(LongSerializationPolicy.STRING).create();
// ByteArrayOutputStream到ByteArrayInputStream会有一次array.copy, 可考虑用管道或者NIO channel.
public void streamRecordsToOSS(List<StreamRecord> records, String bucketName, String filename, boolean isNewFile) {
   if (records.size() == 0) {
       LOG.info("No stream records, skip it!");
       return;
   }
   try {
       CsvWriterSettings settings = new CsvWriterSettings();
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       CsvWriter writer = new CsvWriter(out, settings);
       if (isNewFile) {
           LOG.info("Write csv header, filename {}", filename);
           List<String> headers = Arrays.asList(RECORD_TIMESTAMP, RECORD_TYPE, PRIMARY_KEY, RECORD_COLUMNS);
           writer.writeHeaders(headers);
           System.out.println(writer.getRecordCount());
       }
       List<String[]> totalRows = new ArrayList<String[]>();
       LOG.info("Write stream records, num: {}", records.size());
       for (StreamRecord record : records) {
           String timestamp = String.valueOf(record.getSequenceInfo().getTimestamp());
           String recordType = record.getRecordType().name();
           String primaryKey = gson.toJson(
               TunnelPrimaryKeyColumn.genColumns(record.getPrimaryKey().getPrimaryKeyColumns()));
           String columns = gson.toJson(TunnelRecordColumn.genColumns(record.getColumns()));
           totalRows.add(new String[] {timestamp, recordType, primaryKey, columns});
       }
       writer.writeStringRowsAndClose(totalRows);

       // write to oss file
       ossClient.putObject(bucketName, filename, new ByteArrayInputStream(out.toByteArray()));
   } catch (Exception e) {
       e.printStackTrace();
   }
}

执行备份策略并监测备份过程

  1. 运行通道服务的自动化数据框架,挂载上一步中设计的备份策略代码。
public class TunnelBackup {
    private final ConfigHelper config;
    private final SyncClient syncClient;
    private final CsvHelper csvHelper;
    private final OSSClient ossClient;

    public TunnelBackup(ConfigHelper config) {
        this.config = config;
        syncClient = new SyncClient(config.getEndpoint(), config.getAccessId(), config.getAccessKey(),
            config.getInstanceName());
        ossClient = new OSSClient(config.getOssEndpoint(), config.getAccessId(), config.getAccessKey());
        csvHelper = new CsvHelper(syncClient, ossClient);
    }

    public void working() {
        TunnelClient client = new TunnelClient(config.getEndpoint(), config.getAccessId(), config.getAccessKey(),
            config.getInstanceName());
        OtsReaderConfig readerConfig = new OtsReaderConfig();
        TunnelWorkerConfig workerConfig = new TunnelWorkerConfig(
            new OtsReaderProcessor(csvHelper, config.getOssBucket(), readerConfig));
        TunnelWorker worker = new TunnelWorker(config.getTunnelId(), client, workerConfig);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            worker.shutdown();
            client.shutdown();
        }
    }

    public static void main(String[] args) {
        TunnelBackup tunnelBackup = new TunnelBackup(new ConfigHelper());
        tunnelBackup.working();
    }
}
  1. 监测备份过程

备份过程的监控可以通过表格存储控制台或者 DescribeTunnel 接口完成,DescribeTunnel接口可以实时获取当前Tunnel下每一个Channel的Client(可以自定义), 消费总行数和消费位点等信息。比如用户有下午2点-3点的数据需要同步,若DescribeTunnel获取到的消费位点为下午2点半,则表示备份到一半了,若获取到的消费位点为下午3点,则代表2点-3点的数据已经备份完毕了。

执行文件恢复

在数据备份完成后,我们可以根据需求进行全量的恢复和部分数据的恢复,来降低RTO。在恢复数据时,可以有一些措施来优化恢复的性能:

  • 从OSS下载时,可以使用流式下载或者分片下载的方式,必要时可以增加并发。
  • 写入TableStore时,可以使用并发BatchWrite的方式。

下图为Restore的实例代码(略去若干细节),首先根据需求算出需要下载的文件名(和备份策略对应),然后用OSS的流式下载读取相应的文件,最后用并发的BatchWrite方式写入TableStore的恢复表中。

public class TunnelRestore {
    private ConfigHelper config;
    private final SyncClient syncClient;
    private final CsvHelper csvHelper;
    private final OSSClient ossClient;

    public TunnelRestore(ConfigHelper config) {
        this.config = config;
        syncClient = new SyncClient(config.getEndpoint(), config.getAccessId(), config.getAccessKey(),
            config.getInstanceName());
        ossClient = new OSSClient(config.getOssEndpoint(), config.getAccessId(), config.getAccessKey());
        csvHelper = new CsvHelper(syncClient, ossClient);
    }

    public void restore(String filename, String tableName) {
        csvHelper.parseStreamRecordsFromCSV(filename, tableName);
    }

    public static void main(String[] args) {
        TunnelRestore restore = new TunnelRestore(new ConfigHelper());
        restore.restore("FullData-1551767131130.csv", "testRestore");
    }
}

总结

本文首先介绍了TableStore备份的需求,接着介绍了使用表格存储的通道服务进行数据备份和恢复的原理,最后结合实际的代码片段讲述了如何一步步的构建Tabel Store数据备份恢复方案。

参考链接

  1. https://en.wikipedia.org/wiki/Backup
相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
121 3
|
SQL NoSQL 数据可视化
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1770 0
玩转Tablestore:使用Grafana快速展示时序数据
|
6月前
|
DataWorks NoSQL 关系型数据库
DataWorks产品使用合集之如何从Tablestore同步数据到MySQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
8月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
8月前
|
DataWorks NoSQL 关系型数据库
可以使用dataworks从tablestore同步数据到mysql吗?
可以使用dataworks从tablestore同步数据到mysql吗?
81 1
|
NoSQL 开发工具
TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页
1. 批量读取操作 批量读取操作可以通过多种方式进行,包括: GetRow:根据主键读取一行数据。 BatchGetRow:批量读取多行数据。 GetRange:根据范围读取多行数据。
946 0
|
存储 消息中间件 NoSQL
物联网数据通过规则引擎流转到OTS|学习笔记
快速学习物联网数据通过规则引擎流转到OTS
348 15
物联网数据通过规则引擎流转到OTS|学习笔记
|
存储 负载均衡 开发者
表格存储数据多版本介绍| 学习笔记
快速学习表格存储数据多版本介绍。
表格存储数据多版本介绍| 学习笔记
|
存储 NoSQL 关系型数据库
基于TableStore的海量气象格点数据解决方案实战 王怀远
基于TableStore的海量气象格点数据解决方案实战 王怀远
439 0
基于TableStore的海量气象格点数据解决方案实战 王怀远
|
存储 SQL 运维
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践
前言:从最早的互联网高速发展、到移动互联网的爆发式增长,再到今天的产业互联网、物联网的快速崛起,各种各样新应用、新系统产生了众多订单类型的需求,比如电商购物订单、银行流水、运营商话费账单、外卖订单、设备信息等,产生的数据种类和数据量越来越多;其中订单系统就是一个非常广泛、通用的系统。而随着数据规模的快速增长、大数据技术的发展、运营水平的不断提高,包括数据消费的能力要求越来越高,这对支撑订单系统的数据库设计、存储系统也提出了更多的要求。在新的需求下,传统的经典架构面临着诸多挑战,需要进一步思考架构优化,以更好支撑业务发展;
834 0
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践