Python:mysql-replication监控MySQL的binlog变动

简介: Python:mysql-replication监控MySQL的binlog变动

Github: https://github.com/noplay/python-mysql-replication


设置同步账号权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';

# 刷新权限
flush privileges;

参考

利用Python my-replication读取mysql的binlog

[mysql]mysql grant 用户权限总结


安装

pip install mysql-replication

代码示例

# -*- coding: utf-8 -*-


import datetime
import json

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
)


class DateEncoder(json.JSONEncoder):
"""
自定义类,解决报错:
TypeError: Object of type 'datetime' is not JSON serializable
"""

def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')

elif isinstance(obj, datetime.date):
return obj.strftime("%Y-%m-%d")

else:
return json.JSONEncoder.default(self, obj)


# 配置数据库信息
mysql_settings = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': '123456'
}


def main():
# 实例化binlog 流对象
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=100, # slave标识,唯一
blocking=True, # 阻塞等待后续事件
# 设定只监控写操作:增、删、改
only_events=[
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
]
)

for binlogevent in stream:
# binlogevent.dump() # 打印所有信息

for row in binlogevent.rows:
# 打印 库名 和 表名
event = {"schema": binlogevent.schema, "table": binlogevent.table}

if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["data"] = row["values"]

elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["data"] = row["after_values"] # 注意这里不是values

elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["data"] = row["values"]

print(json.dumps(event, cls=DateEncoder))
# sys.stdout.flush()

# stream.close() # 如果使用阻塞模式,这行多余了


if name == '__main__':
main()
"""
输出数据格式
{
"schema": "demo", # 数据库名
"table": "student", # 表名
"action": "update", # 动作 insert、delete、update
"data": { # 数据,里边包含所有字段
"id": 26,
"name": "haha",
"age": 34,
"update_time": "2019-06-06 16:59:06",
"display": 0
}
}
"""



            </div>
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
数据可视化 测试技术
深入理解软件测试中的风险评估方法
【4月更文挑战第19天】 在软件开发的生命周期中,风险评估是确保产品质量和项目成功的关键步骤。本文将探讨几种常用的软件测试风险评估方法,包括定性分析和定量分析,并讨论它们在不同类型的测试环境中的应用。通过案例研究和最佳实践,我们将展示如何有效识别、评估和管理测试过程中可能遇到的风险,以及如何制定相应的缓解策略,以优化资源分配和提高测试效率。
824 0
|
Unix Linux
grep显示匹配行及其行号
grep显示匹配行及其行号
571 2
|
机器学习/深度学习 计算机视觉
YOLOv5改进 | Conv篇 | 在线重参数化卷积OREPA助力二次创新(提高推理速度 + FPS)
YOLOv5改进 | Conv篇 | 在线重参数化卷积OREPA助力二次创新(提高推理速度 + FPS)
421 2
|
小程序 前端开发 API
一文就知道uniapp等跨端开发的使用场景,学习成本,如何快速使用,基本语法等
uniapp是一个跨平台开发各种各样应用的一套框架。只需要写一套代码,可以适配多达14种产品类型,比如H5移动端、微信小程序及各种其他小程序,ios、安卓等接近原生APP的应用(可以上架到App Store或应用商店)。所以这里的多端,指的并不是PC、平板、手机端,而是移动端优先,开发者可以一次编码,分别编译为小程序和 Android 以及 iOS 应用,实现多端开发
1147 0
|
JavaScript 前端开发
CMD和UMD,ES Module的差别
CMD和UMD,ES Module的差别
|
Rust 开发者
Rust中的模块与包管理:构建高效、可扩展的代码库
本文详细阐述了Rust编程语言中模块与包管理的概念、特点和使用方法。通过深入了解模块与包的概念、组织方式、导入导出机制以及Rust的Cargo工具,我们将学会如何构建高效、可扩展的代码库,提高代码的可读性、可维护性和可重用性。
|
SQL 数据库连接 API
SqlAlchemy 2.0 中文文档(五十五)(3)
SqlAlchemy 2.0 中文文档(五十五)
515 1
|
数据采集 人工智能 边缘计算
|
存储 前端开发 JavaScript
强烈推荐一个Python库!制作Web Gui也太简单了!
强烈推荐一个Python库!制作Web Gui也太简单了!
793 0