Canal监听MySQL Binarylog消费实践

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 在MySQL作为如今最为主流使用的数据库背景下,除了常规的数据存储使用场景,还存在大量的使用需求,如:数据自动同步,数据更新监听等场景。由于数据库层面的增量数据变动无法依靠应用服务层面进行有效感知,因此,还是需要从数据库自身提供的机制入手进行实现处理。下面为将展示关于如何借助Canal实践解决场景的几个业务场景问题

背景

在MySQL作为如今最为主流使用的数据库背景下,除了常规的数据存储使用场景,还存在大量的使用需求,如:数据自动同步,数据更新监听等场景。由于数据库层面的增量数据变动无法依靠应用服务层面进行有效感知,因此,还是需要从数据库自身提供的机制入手进行实现处理。下面为将展示关于如何借助Canal实践解决场景的几个业务场景问题。

Canal简述

Github开源地址:https://github.com/alibaba/canal
image.png

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)。
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)。
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据。

Canal工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。
  • canal 解析 binary log 对象(原始为 byte 流)。

Canal服务范围

  • 当前Canal支持源端MYSQL的版本包括:5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。
  • Canal直接支持的写入目标类型包括:MYSQL、Kafka、elasticsearch、Hbase、RocketMQ等。由于Datahub直接支持Kafka协议的写入,所以Canal服务也可以支持往Datahub中写入Binary Log数据。

Canal消费方式

Canal在伪装成为目标MySQL的一个Slave节点后,获取到来自主节点的BinaryLog日志内容。那么作为BinaryLog消费者该如何使用canal监听得到的内容呢。Canal为我们提供了两种类型的方式,直接消费和投递。直接消费即使用Canal配套提供的客户端程序,即时消费Canal的监听内容。投递是指配置指定的MQ类型以及对应信息,Canal将会按照BinaryLog的条目投递到指定的MQ下,再交由MQ为各种消费形式提供数据消费。

Canal客户端消费

Canal官方SDK提供地址:https://github.com/alibaba/canal/wiki/ClientExample

  • 消费使用代码摘要与简单说明:
// List<Entry>为一次消费包含的多条BinaryLog数据内容
private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            // 行变动数据存放对象,其中rowDatasList为更新前后数据内容
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }
        // 当前BinaryLog动作类型
        EventType eventType = rowChage.getEventType();
        
        // entry.getHeader()存放当前变动数据对应的Schema、Table信息
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

Canal投递MQ

Canal暂时提供的投递MQ类型包括:

投递配置

以投递到RocketMQ为样例提供Canal版本为1.1.4的配置样例(对比发现1.1.5开始配置内容有所改动,官网文档说明内容有所存疑,因为为大家提供可用1.1.4版本为配置样例参考),MQ使用大禹平台RocketMQ为样例。
  • canal.properties
# 改动部分一
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =${大禹账号AccessKey}
canal.aliyun.secretKey = ${大禹账号SecretKey}

#改动部分二
##################################################
#########              MQ              #############
##################################################
canal.mq.servers = ${大禹MQ访问地址}
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = ${大禹生产者Group}
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
  • instance.properties
# position info
canal.instance.master.address=${canal服务监听的MySQL地址}

canal.instance.dbUsername=${数据库访问用户名}
canal.instance.dbPassword=${数据库访问密码}

# table regex
canal.instance.filter.regex=${监听数据表匹配表达式}
# mq config
canal.mq.topic=${大禹MQ投递Topic名称}

投递内容数据结构展示

与客户端直接消费不同,canal投递上MQ的消息内容为文本内容,下面为大家展示MQ中一条BinaryLog的格式内容,直观地感受可以使用的相关信息。

  • 样例数据
{
    "data": [
        {
            "id": "4971874",
            "code": "45ffb86",
            "name": "组织1",
            "display_name": "组织1",
            "simple_name": "组织1",
            "parent_code": "da43c609",
            "type": "R",
            "sub_type": null,
            "comment": null,
            "level": "2",
            "region_id": null,
            "dutyuser_id": null,
            "ou_code": null,
            "cry": null,
            "line": null,
            "manager": null,
            "address": null,
            "create_by": null,
            "create_on": null,
            "last_modify_by": "20443",
            "last_modify_on": "2021-12-09 06:00:03"
        }
    ],
    "database": "test",
    "es": 1617179083000,
    "id": 7,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(20)",
        "code": "varchar(255)",
        "name": "varchar(255)",
        "display_name": "varchar(255)",
        "simple_name": "varchar(128)",
        "parent_code": "varchar(255)",
        "type": "varchar(32)",
        "sub_type": "varchar(32)",
        "comment": "varchar(2040)",
        "level": "int(11)",
        "region_id": "varchar(128)",
        "dutyuser_id": "varchar(32)",
        "ou_code": "varchar(32)",
        "cry": "varchar(32)",
        "line": "varchar(32)",
        "manager": "varchar(255)",
        "address": "varchar(255)",
        "create_by": "bigint(20)",
        "create_on": "datetime",
        "last_modify_by": "bigint(20)",
        "last_modify_on": "datetime"
    },
    "old": [
        {
            "last_modify_by": null
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": -5,
        "code": 12,
        "name": 12,
        "display_name": 12,
        "simple_name": 12,
        "parent_code": 12,
        "type": 12,
        "sub_type": 12,
        "comment": 12,
        "level": 4,
        "region_id": 12,
        "dutyuser_id": 12,
        "ou_code": 12,
        "cry": 12,
        "line": 12,
        "manager": 12,
        "address": 12,
        "create_by": -5,
        "create_on": 93,
        "last_modify_by": -5,
        "last_modify_on": 93
    },
    "table": "test2",
    "ts": 1617179084459,
    "type": "UPDATE"
}

上述展示的内容为更新其中一条数据中的"last_modify_by"字段的内容。借助上面的BinaryLog样例,可以关注到一下内容:

  • 库表信息:字段"database","table"分别展示了数据库名称以及库表名称。
  • 数据表结构:mysqlType中囊括了当前BinaryLog中操作的数据表的具体字段结构,包括字段名称、字段类型;
  • 操作类型:字段"type"显示了该BinaryLog内容对应的操作类型。主要包括:INSERT、UPDATE、DELETE。
  • 更新前后的数据内容:"data"中可以看到组成为数组List,说明一条BinaryLog数据可能包括多条更新后的数据内容。同样地,从"old"字段中也可以看到更新前的数据内容,也同为数据List结构。

应用实践

数据增量同步

  • 实践背景

    在某项目交付过程中,客户希望解决一个数据同步问题,要求实时性较高,性能影响尽可能少地减少。同时,由于来源数据库为一个外部维护的数据库,且无法直接使用到新建设的业务系统中作为业务数据库。旧的解决方案进行每日同步的频次执行,在业务量较少的时间段进行一次大规模的数据查询再插入的动作。旧的方案会导致数据同步滞后时间为一天,且全量的查询动作对外部数据库产生较大的使用影响,遭到外部维护方的查询限制,同步执行时间也比较长。

  • 数据同步实时化

    为了使数据能够进行实时同步,决定使用Canal接入到外部数据库,然后把Canal监听的BinaryLog接入到新建设的MySQL库中,使得两边的数据库数据同步延迟仅有秒级差异。Canal的接入也使得每日执行的同步任务得以取消,减少了额外的系统维护工作。而且BinaryLog的监听推送对外部数据库性能来说影响较少。

  • 增量数据投递消费

    此外,Canal投递消费能力能够拓展数据增量改动的体现形式。Canal把感知到的数据库变动内容投递到指定的MQ Topic,为后续的消费途径提供多样性。如:Canal订阅指定数据表的变动数据投递到Datahub中,投递的内容就如上面的数据结构展示。允许借助Blink计算平台对数据进行感知整合,实现业务场景的下聚合统计等实时计算诉求;也能够开放Datahub的Topic订阅权限,把增量数据的变动开发到指定使用者,提供实时数据变动推送。

image.png

监听字段更新

  • 实践背景

    在大屏项目建设过程中,大量的指标数据维护同一指标表中是常见的处理手段。但指标数据的更新来源比较复杂,有数据开发同学进行写入,有外部系统进行推送。客户要求对大屏指标的实时性与有效性进行保证,但大量的指标更新情况无法有效监控。

  • 解决方案

    使用Canal对指定的指标表进行监听,对指标表的更新数据BinaryLog进行解析,然后以日志形式记录。针对每一条数据内容能够识别到具体的指标,把当前更新的数据信息记录到库表中。再按照对应的指标更新要求,感知更新日志表的数据库,就能够确保及时知道指标的更新频次是否符合预期,指标数据每次更新的数据内容,做到更新频次可监控,更新数据变动可追溯。

image.png

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
4月前
|
安全 关系型数据库 MySQL
PHP与MySQL交互:从入门到实践
【9月更文挑战第20天】在数字时代的浪潮中,掌握PHP与MySQL的互动成为了开发动态网站和应用程序的关键。本文将通过简明的语言和实例,引导你理解PHP如何与MySQL数据库进行对话,开启你的编程之旅。我们将从连接数据库开始,逐步深入到执行查询、处理结果,以及应对常见的挑战。无论你是初学者还是希望提升技能的开发者,这篇文章都将为你提供实用的知识和技巧。让我们一起探索PHP与MySQL交互的世界,解锁数据的力量!
|
4月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
888 4
|
23天前
|
缓存 关系型数据库 MySQL
MySQL 索引优化与慢查询优化:原理与实践
通过本文的介绍,希望您能够深入理解MySQL索引优化与慢查询优化的原理和实践方法,并在实际项目中灵活运用这些技术,提升数据库的整体性能。
66 5
|
1月前
|
存储 关系型数据库 MySQL
PHP与MySQL动态网站开发:从基础到实践####
本文将深入探讨PHP与MySQL的结合使用,展示如何构建一个动态网站。通过一系列实例和代码片段,我们将逐步了解数据库连接、数据操作、用户输入处理及安全防护等关键技术点。无论您是初学者还是有经验的开发者,都能从中获益匪浅。 ####
|
2月前
|
关系型数据库 MySQL Java
MySQL索引优化与Java应用实践
【11月更文挑战第25天】在大数据量和高并发的业务场景下,MySQL数据库的索引优化是提升查询性能的关键。本文将深入探讨MySQL索引的多种类型、优化策略及其在Java应用中的实践,通过历史背景、业务场景、底层原理的介绍,并结合Java示例代码,帮助Java架构师更好地理解并应用这些技术。
60 2
|
2月前
|
关系型数据库 MySQL Linux
Linux环境下MySQL数据库自动定时备份实践
数据库备份是确保数据安全的重要措施。在Linux环境下,实现MySQL数据库的自动定时备份可以通过多种方式完成。本文将介绍如何使用`cron`定时任务和`mysqldump`工具来实现MySQL数据库的每日自动备份。
151 3
|
2月前
|
存储 监控 关系型数据库
MySQL自增ID耗尽解决方案:应对策略与实践技巧
在MySQL数据库中,自增ID(AUTO_INCREMENT)是一种特殊的属性,用于自动为新插入的行生成唯一的标识符。然而,当自增ID达到其最大值时,会发生什么?又该如何解决?本文将探讨MySQL自增ID耗尽的问题,并提供一些实用的解决方案。
56 1
|
1月前
|
SQL 关系型数据库 MySQL
PHP与MySQL的高效交互:从基础到实践####
本文深入探讨了PHP与MySQL数据库之间的高效交互技术,涵盖了从基础连接到高级查询优化的全过程。不同于传统的摘要概述,这里我们直接以一段精简代码示例作为引子,展示如何在PHP中实现与MySQL的快速连接与简单查询,随后文章将围绕这一核心,逐步展开详细讲解,旨在为读者提供一个从入门到精通的实战指南。 ```php <?php // 数据库配置信息 $servername = "localhost"; $username = "root"; $password = "password"; $dbname = "test_db"; // 创建连接 $conn = new mysqli($se
30 0
|
3月前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:百万级数据统计优化实践
【10月更文挑战第21天】 在处理大规模数据集时,传统的单体数据库解决方案往往力不从心。MySQL和Redis的组合提供了一种高效的解决方案,通过将数据库操作与高速缓存相结合,可以显著提升数据处理的性能。本文将分享一次实际的优化案例,探讨如何利用MySQL和Redis共同实现百万级数据统计的优化。
133 9
|
3月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
226 0

推荐镜像

更多