Canal实现mysql数据与redis同步

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 Tair(兼容Redis),内存型 2GB
简介: Canal实现mysql数据与redis同步

Canal实现mysql数据与redis同步

1.Canal简介

canal [kə'næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

2.项目背景

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

基于日志增量订阅和消费支持的业务包括:

1.数据库镜像

2.数据库实时备份

3.多级索引 (卖家和买家各自分库索引)

4.search build

5.业务cache刷新

6.带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

3.工作原理

3.1.MySQL主从复制原理

1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

2.MySQL slave master binary log events 拷贝到它的中继日志(relay log)

3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

3.2.Canal工作原理

1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

2.MySQL master 收到 dump 请求,开始推送 binary log slave ( canal )

3.canal 解析 binary log 对象(原始为 byte )

实现步骤

MySQL配置

我使用的是mysql5.6.x,修改mysql的配置文件my.ini,加入下面内容

日志存放路径

general_log=ON

general_log_file=G:\mysql-cluster\master\master.log

#default-character-set=utf8

character-set-server=utf8

default-storage-engine=InnoDB

#innodb_force_recovery = 1

# server_id = .....

log_bin=mysql-bin

binlog-format=ROW

server-id=1

需要同步的数据库名称

binlog-do-db=test_canal

忽略的数据库,建议填写

binlog-ignore-db=mysql

配置完后重新启动数据库服务

执行下列授权语句

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT,REPLICATION SLAVE,REPLICATION client ON *.* to 'canal'@'localhost'

FLUSH PRIVILEGES;

 

Canal部署与配置

下载canalhttps://github.com/alibaba/canal/releases/

这里我下载的是1.1.5

下载完成解压即可,修改配置

canal/conf/canal.properties可保持不变,默认的端口51

canal/conf/example/instance.properties需要配置

#################################################

## mysql serverId , v1.0.26+ will autoGen

# canal.instance.mysql.slaveId=0

 

# enable gtid use true/false

这里的id不要跟mysqlserver-id相同

canal.instance.mysql.slaveId=123

canal.instance.gtidon=false

 

# position info

canal.instance.master.address=127.0.0.1:3307

canal.instance.master.journal.name=

canal.instance.master.position=

canal.instance.master.timestamp=

canal.instance.master.gtid=

 

# rds oss binlog

canal.instance.rds.accesskey=

canal.instance.rds.secretkey=

canal.instance.rds.instanceId=

 

# table meta tsdb info

canal.instance.tsdb.enable=false

#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

#canal.instance.tsdb.dbUsername=canal

#canal.instance.tsdb.dbPassword=canal

 

#canal.instance.standby.address =

#canal.instance.standby.journal.name =

#canal.instance.standby.position =

#canal.instance.standby.timestamp =

#canal.instance.standby.gtid=

 

# username/password

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

canal.instance.connectionCharset = UTF-8

# enable druid Decrypt database password

canal.instance.enableDruid=false

#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

 

# table regex

canal.instance.filter.regex=.*\\..*

# table black regex

canal.instance.filter.black.regex=mysql\\.slave_.*

# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

 

# mq config

canal.mq.topic=example

# dynamic topic route by schema or table regex

#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*

canal.mq.partition=0

# hash partition config

#canal.mq.partitionsNum=3

#canal.mq.partitionHash=test.table:id^name,.*\\..*

#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6

#################################################

 

配置完成后启动canal,我这里是windows系统,直接双击bin/startup.bat即可

查看是否正常启动,需要看2个日志文件

logs/canal/canal.log文件中有如下内容:the canal server is running now ......

logs/example/example.log文件中有如下内容:start successful....

证明启动成功

Javacanal客户端

创建maven工程

导入pom依赖

<dependency>

    <groupId>com.alibaba.otter</groupId>

    <artifactId>canal.client</artifactId>

    <version>1.1.0</version>

</dependency>

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

Redis工具类

package cn.demo.util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.io.IOException;
import java.util.Properties;

/**
 * 获取连接池对象 */
public enum RedisUtils {
    INSTANCE;
    static JedisPool jedisPool = null;

    static {
        //1 创建连接池配置对象        JedisPoolConfig config = new JedisPoolConfig();
        //2 进行配置-四个配置        config.setMaxIdle(1);//最小连接数        config.setMaxTotal(11);//最大连接数        config.setMaxWaitMillis(10 * 1000L);//最长等待时间        config.setTestOnBorrow(true);//测试连接时是否畅通        //3 通过配置对象创建连接池对象        Properties properties = null;
        try {
            properties = new Properties();
            properties.load(RedisUtils.class.getClassLoader().getResourceAsStream("redis.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        String host = properties.getProperty("redis.host");
        String port = properties.getProperty("redis.port");
        String password = properties.getProperty("redis.password");
        String timeout = properties.getProperty("redis.timeout");
        System.out.println(host);
        System.out.println(port);
        System.out.println(password);
        System.out.println(timeout);
        jedisPool = new JedisPool(config, host, Integer.valueOf(port),Integer.valueOf(timeout), password);
    }

    //获取连接    public Jedis getSource() {
        return jedisPool.getResource();
    }

    //关闭资源    public void closeSource(Jedis jedis) {
        if (jedis != null) {
            jedis.close();
        }

    }
    /**
     * 设置字符值     *
     * @param key
     * @param
     */
    public void del(String key) {
        Jedis jedis = getSource();
        jedis.del(key);
        closeSource(jedis);
    }
    /**
     * 设置字符值     *
     * @param key
     * @param value
     */
    public void set(String key, String value) {
        Jedis jedis = getSource();
        jedis.set(key, value);
        closeSource(jedis);
    }

    /**
     * 设置     * @param key
     * @param value
     */
    public void set(byte[] key, byte[] value) {
        Jedis jedis = getSource();
        jedis.set(key, value);
        closeSource(jedis);
    }

    /**
     *
     * @param key
     * @return
     */
    public byte[]  get(byte[] key) {
        Jedis jedis = getSource();
        try {
            return jedis.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeSource(jedis);
        }
        return null;

    }

    /**
     * 设置字符值     *
     * @param key
     */
    public String get(String key) {
        Jedis jedis = getSource();
        try {
            return jedis.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeSource(jedis);
        }

        return null;

    }

    public void set(String key, String value, Integer time) {
        Jedis jedis = getSource();
        jedis.setex(key,time,value);
        closeSource(jedis);
    }
}

Canal客户端测试类

package cn.demo.test;

import cn.demo.util.RedisUtils;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * canal测试 */
public class SimpleCanalClientExample {


    public static void main(String args[]) {
        // 创建链接        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认                // connector.rollback(batchId); // 处理失败回滚数据            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    //printColumn(rowData.getBeforeColumnsList());
                    redisDelete(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    //printColumn(rowData.getAfterColumnsList());
                    redisInsertOrUpdate(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    //printColumn(rowData.getAfterColumnsList());
                    redisInsertOrUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }
    // 修改redis
    private static void redisInsertOrUpdate(List<Column> columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtils.INSTANCE.set("user:" + columns.get(0).getValue(), json.toJSONString());
        }
    }
    // 删除redis
    private static void redisDelete(List<Column> columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtils.INSTANCE.del("user:" + columns.get(0).getValue());
        }
    }



    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

测试效果,触发数据库变更

执行sql语句查看redis相应的变化

经过测试,mysql的数据已经同步到redis,测试成功

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
9天前
|
NoSQL 关系型数据库 Redis
《docker高级篇(大厂进阶):1.Docker复杂安装详说》包括:安装mysql主从复制、安装redis集群
《docker高级篇(大厂进阶):1.Docker复杂安装详说》包括:安装mysql主从复制、安装redis集群
54 14
|
6天前
|
关系型数据库 MySQL 应用服务中间件
《docker基础篇:8.Docker常规安装简介》包括:docker常规安装总体步骤、安装tomcat、安装mysql、安装redis
《docker基础篇:8.Docker常规安装简介》包括:docker常规安装总体步骤、安装tomcat、安装mysql、安装redis
43 7
|
18天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
64 16
|
30天前
|
NoSQL Java 关系型数据库
Liunx部署java项目Tomcat、Redis、Mysql教程
本文详细介绍了如何在 Linux 服务器上安装和配置 Tomcat、MySQL 和 Redis,并部署 Java 项目。通过这些步骤,您可以搭建一个高效稳定的 Java 应用运行环境。希望本文能为您在实际操作中提供有价值的参考。
134 26
|
16天前
|
NoSQL 关系型数据库 MySQL
Linux安装jdk、mysql、redis
Linux安装jdk、mysql、redis
144 7
|
26天前
|
存储 关系型数据库 MySQL
mysql怎么查询longblob类型数据的大小
通过本文的介绍,希望您能深入理解如何查询MySQL中 `LONG BLOB`类型数据的大小,并结合优化技术提升查询性能,以满足实际业务需求。
93 6
|
1月前
|
SQL 关系型数据库 MySQL
mysql分页读取数据重复问题
在服务端开发中,与MySQL数据库进行数据交互时,常因数据量大、网络延迟等因素需分页读取数据。文章介绍了使用`limit`和`offset`参数实现分页的方法,并针对分页过程中可能出现的数据重复问题进行了详细分析,提出了利用时间戳或确保排序规则绝对性等解决方案。
|
2月前
|
关系型数据库 MySQL 数据库
GBase 数据库如何像MYSQL一样存放多行数据
GBase 数据库如何像MYSQL一样存放多行数据
|
20天前
|
存储 缓存 NoSQL
解决Redis缓存数据类型丢失问题
解决Redis缓存数据类型丢失问题
164 85
|
3月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
87 6
下一篇
开通oss服务