基于Spark Streaming 进行 MySQL Binlog 日志准实时传输

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 基本架构 RDS -> SLS -> Spark Streaming -> Spark HDFS 上述链路主要包含3个过程: 如何把 RDS 的 binlog 收集到 SLS。 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。

基本架构

RDS -> SLS -> Spark Streaming -> Spark HDFS

上述链路主要包含3个过程:

  1. 如何把 RDS 的 binlog 收集到 SLS。
  2. 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。
  3. 如何把链路 2 中读取和处理过的日志,保存到 Spark HDFS中。

环境准备

  1. 安装一个 MySQL 类型的数据库(使用 MySQL 协议,例如 RDS、DRDS 等),开启 log-bin 功能,且配置 binlog 类型为 ROW 模式(RDS默认开启)。
  2. 开通 SLS 服务。

操作步骤

  1. 检查 MySQL 数据库环境。

    1. 查看是否开启 log-bin 功能。
    mysql> show variables like "log_bin";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin       | ON    |
    +---------------+-------+
    1 row in set (0.02 sec)
    1. 查看 binlog 类型
    mysql> show variables like "binlog_format";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW   |
    +---------------+-------+
    1 row in set (0.03 sec)  
  2. 添加用户权限。(也可以直接通过RDS控制台添加)

    CREATE USER canal IDENTIFIED BY ‘canal’;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;FLUSH PRIVILEGES;
  3. 为 SLS 服务添加对应的配置文件,并检查数据是否正常采集。

    1. 在 SLS 控制台添加对应的 project 和 logstore,例如:创建一个名称为 canaltest 的 project,然后创建一个名称为 canal 的 logstore。
    2. 对 SLS 进行配置:在 /etc/ilogtail 目录下创建文件user_local_config.json,具体配置如下:
    {
    "metrics": {
     "##1.0##canaltest$plugin-local": {
         "aliuid": "****",
         "enable": true,
         "category": "canal",
         "defaultEndpoint": "*******",
         "project_name": "canaltest",
         "region": "cn-hangzhou",
         "version": 2
         "log_type": "plugin",
         "plugin": {
             "inputs": [
                 {
                     "type": "service_canal",
                     "detail": {
                         "Host": "*****",
                         "Password": "****",
                         "ServerID": ****,
                         "User" : "***",
                         "DataBases": [
                             "yourdb"
                         ],
                         "IgnoreTables": [
                             "\\S+_inner"
                         ],
                          "TextToString" : true
                     }
                 }
             ],
             "flushers": [
                 {
                     "type": "flusher_sls",
                     "detail": {}
                 }
             ]
         }
     }
    }
    }

    其中 detail 中的 Host 和 Password 等信息为 MySQL 数据库信息,User 信息为之前授权过的用户名。aliUid、defaultEndpoint、project_name、category 请根据自己的实际情况填写对应的用户和 SLS 信息。

    1. 等待约 2 分钟,通过 SLS 控制台查看日志数据是否上传成功,具体如图所示。
      image

如果日志数据没有采集成功,请根据SLS的提示,查看SLS的采集日志进行排查。

  1. 准备代码,将代码编译成 jar 包,然后上传到 OSS。

    1. 将 EMR 的示例代码通过 git 复制下来,然后进行修改,具体命令为:
    git clone https://github.com/aliyun/aliyun-emapreduce-demo.git。

    示例代码中已经有 LoghubSample 类,该类主要用于从 SLS 采集数据并打印。以下是修改后的代码,供参考:

    package com.aliyun.emr.example
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    object LoghubSample {
    def main(args: Array[String]): Unit = {
    if (args.length < 7) {
     System.err.println(
       """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
         |            
         |           
       """.stripMargin)
     System.exit(1)
    }
    val loghubProject = args(0)
    val logStore = args(1)
    val loghubGroupName = args(2)
    val endpoint = args(3)
    val accessKeyId = args(4)
    val accessKeySecret = args(5)
    val batchInterval = Milliseconds(args(6).toInt * 1000)
    val conf = new SparkConf().setAppName("Mysql Sync")
    //    conf.setMaster("local[4]");
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
     ssc,
     loghubProject,
     logStore,
     loghubGroupName,
     endpoint,
     1,
     accessKeyId,
     accessKeySecret,
     StorageLevel.MEMORY_AND_DISK)
    loghubStream.foreachRDD(rdd =>
       rdd.saveAsTextFile("/mysqlbinlog")
    )
    ssc.start()
    ssc.awaitTermination()
    }
    }

其中的主要改动是:

loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) )

这样在 EMR 集群中运行时,就会把Spark Streaming 中流出来的数据,保存到 EMR 的 HDFS 中。

  1. 说明
    由于如果要在本地运行,请在本地环境提前搭建 Hadoop 集群。

由于 EMR 的 Spark SDK 做了升级,其示例代码比较旧,不能直接在参数中传递 OSS 的 AccessKeyId、AccessKeySecret, 而是需要通过 SparkConf 进行设置,如下所示。

trait RunLocally {
val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
conf.set("spark.hadoop.job.runlocal", "true")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
val sc = new SparkContext(conf)
def getAppName: String
}

在本地调试时,需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) ) 中的 /mysqlbinlog 修改成本地 HDFS的地址。

  1. 代码编译。
    在本地调试完成后,我们可以通过如下命令进行打包编译:

  2. clean install

  3. 上传 jar 包。
    请先在 OSS 上建立 bucket 为 qiaozhou-EMR/jar的目录,然后通过OSS 控制台或 OSS 的 SDK 将 /target/shaded目录下的 examples-1.1-shaded.jar上传到 OSS 的这个目录下。上传后的 jar 包地址为 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar,这个地址在后面会用上,如下图所示:

image

  1. 搭建 EMR 集群,创建任务并运行执行计划。

    1. 通过 EMR 控制台创建一个 EMR 集群,大约需要 10 分钟左右,请耐心等待。
    2. 创建一个类型为 Spark 的作业。
      请根据您具体的配置将 SLS_endpoint $SLS_access_id $SLS_secret_key 替换成真实值。请注意参数的顺序,否则可能会报错。
    —master yarn —deploy-mode client —driver-memory 4g —executor-memory 2g —executor-cores 2 —class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1

运行以上的命令

  1. 查询 Master 节点的IP
  2. 通过 SSH 登录后,执行以下命令:

  3. fs -ls /

  4. 可以看到 mysqlbinlog 开头的目录,再通过以下命令查看 mysqlbinlog 文件:

  5. fs -ls /mysqlbinlog

还可以通过 hadoop fs -cat /mysqlbinlog/part-00000 命令查看文件内容。

  1. 错误排查。
    如果没有看到正常的结果,可以登陆节点,查看对应的作业的错误情况。
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
存储 SQL 关系型数据库
mysql 的ReLog和BinLog区别
MySQL中的重做日志和二进制日志是确保数据库稳定性和可靠性的关键组件。重做日志主要用于事务的持久性和原子性,通过记录数据页的物理修改信息来恢复未提交的事务;而二进制日志记录SQL语句的逻辑变化,支持数据复制、恢复和审计。两者在写入时机、存储方式及配置参数等方面存在显著差异。
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
56 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
114 0
|
17天前
|
SQL 关系型数据库 MySQL
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE &#39;log_%&#39;;`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
60 2
|
1月前
|
SQL 存储 缓存
MySQL进阶突击系列(02)一条更新SQL执行过程 | 讲透undoLog、redoLog、binLog日志三宝
本文详细介绍了MySQL中update SQL执行过程涉及的undoLog、redoLog和binLog三种日志的作用及其工作原理,包括它们如何确保数据的一致性和完整性,以及在事务提交过程中各自的角色。同时,文章还探讨了这些日志在故障恢复中的重要性,强调了合理配置相关参数对于提高系统稳定性的必要性。
|
2月前
|
关系型数据库 MySQL 数据库
【赵渝强老师】MySQL的binlog日志文件
MySQL的binlog日志记录了所有对数据库的更改操作(不包括SELECT和SHOW),主要用于主从复制和数据恢复。binlog有三种模式,可通过设置binlog_format参数选择。示例展示了如何启用binlog、设置格式、查看日志文件及记录的信息。
192 6
|
2月前
|
存储 SQL 关系型数据库
mysql 的ReLog和BinLog区别
MySQL中的重做日志(Redo Log)和二进制日志(Binary Log)是两种重要的日志系统。重做日志主要用于保证事务的持久性和原子性,通过记录数据页的物理修改信息来恢复未提交的事务更改。二进制日志则记录了数据库的所有逻辑变化操作,用于数据的复制、恢复和审计。两者在写入时机、存储方式、配置参数和使用范围上有所不同,共同确保了数据库的稳定性和可靠性。
|
2月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
3月前
|
存储 关系型数据库 MySQL
MySQL中的Redo Log、Undo Log和Binlog:深入解析
【10月更文挑战第21天】在数据库管理系统中,日志是保障数据一致性和完整性的关键机制。MySQL作为一种广泛使用的关系型数据库管理系统,提供了多种日志类型来满足不同的需求。本文将详细介绍MySQL中的Redo Log、Undo Log和Binlog,从背景、业务场景、功能、底层实现原理、使用措施等方面进行详细分析,并通过Java代码示例展示如何与这些日志进行交互。
338 0
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
70 0
下一篇
开通oss服务