Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch

mysqlsmom 文档: https://mysqlsmom.readthedocs.io/en/latest/hello.html

github: https://github.com/m358807551/mysqlsmom


环境要求:

1、python2.7
2、redis
3、Mysql 配置 binlog-format=row

安装

pip install mysqlsmom

全量同步

# 创建全量同步配置文件
$ mom new test_mom/init_config.py -t init --force
# 编辑配置文件
$ vim ./test_mom/init_config.py  # 按注释提示修改配置
# 开始同步
$ mom run -c ./test_mom/init_config.py

增量同步

配置三个文件

test_mom
├── binlog_config.py   # 配置文件
├── my_filters.py         # 过滤器 配置于 watched 
└── my_handlers.py    # 处理器 配置于 pipeline

新建配置

mom new test_mom/binlog_config.py -t binlog --force

1、binlog_config.py


# coding=utf-8
STREAM = "BINLOG"  # "BINLOG" or "INIT"
SERVER_ID = 99
SLAVE_UUID = __name__
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
BINLOG_CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': '123456'
}
# redis存储上次同步位置等信息
REDIS = {
    "host": "127.0.0.1",
    "port": 6379,
    "db": 0,
    # "password": "password",  # 不需要密码则注释或删掉该行
}
NODES = [{"host": "127.0.0.1", "port": 9200}]
TASKS = [
    {
        "stream": {
            "database": "demo",
            "table": "student"
        },
        "jobs": [{
            "actions": ["insert", "update"],
            "watched": {
                "filter_display": {}
            },
            "pipeline": [
                {"only_fields": {"fields": ["id", "name", "age"]}},
                {"change_name": {"key": "name", "prefix": "hot-"}},
                {"set_id": {"field": "id"}}
            ],
            "dest": {
                "es": {
                    "action": "upsert",
                    "index": "demo",
                    "type": "student",
                    "nodes": NODES
                }
            }
        },
            {
                "actions": ["delete"],
                "pipeline": [
                    # {"only_fields": {"fields": ["id", "name", "age"]}},
                    {"set_id": {"field": "id"}}
                ],
                "dest": {
                    "es": {
                        "action": "delete",
                        "index": "demo",
                        "type": "student",
                        "nodes": NODES
                    }
                }
            }
        ]
    }
]
CUSTOM_ROW_HANDLERS = "./my_handlers.py"
CUSTOM_ROW_FILTERS = "./my_filters.py"

自定义处理器 my_handlers.py


# -*- coding: utf-8 -*-
import copy
def change_name(row, key, prefix):
    new_row = copy.deepcopy(row)
    new_row[key] = "{}{}".format(prefix, row[key])
    # 返回数据字典,下一工序继续处理
    return new_row

自定义过滤器 my_filters.py


# -*- coding: utf-8 -*-
def filter_display(event):
  # 返回True 或 False,使用或丢弃
    return event["values"]["display"] == 1

启动

mom run -c test_mom/binlog_config.py
相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
1月前
|
缓存 JavaScript 前端开发
Nginx缓存优化配置(手把手教你提升网站加载速度)
本文介绍如何通过Nginx缓存优化网站性能,涵盖代理缓存与静态资源缓存的配置方法,帮助小白快速掌握缓存设置、验证及清理技巧,提升访问速度与服务器效率。
|
编解码 NoSQL Java
使用Spring Boot + Redis 队列实现视频文件上传及FFmpeg转码的技术分享
【8月更文挑战第30天】在当前的互联网应用中,视频内容的处理与分发已成为不可或缺的一部分。对于视频平台而言,高效、稳定地处理用户上传的视频文件,并对其进行转码以适应不同设备的播放需求,是提升用户体验的关键。本文将围绕使用Spring Boot结合Redis队列技术来实现视频文件上传及FFmpeg转码的过程,分享一系列技术干货。
1050 4
|
11月前
|
JavaScript 前端开发 数据可视化
20.6K star!Excel级交互体验!这款开源Web表格神器绝了!
Handsontable 是一款功能强大的 JavaScript 数据表格组件,提供类 Excel 的交互体验。支持实时协作、数据绑定、公式计算等企业级功能,可轻松集成到 React/Vue/Angular 等主流框架。
2175 11
|
关系型数据库 MySQL API
MySQL 历史数据迁移到 Elasticsearch
MySQL 历史数据迁移到 Elasticsearch
578 4
|
存储 前端开发 JavaScript
|
人工智能 自然语言处理 安全
claude国内怎么用?教你两种claude国内使用方法!
Claude AI 是由 Anthropic 公司开发的一款新一代 AI 助手,旨在成为更安全、更友好、更可靠的 AI 系统。它基于 Anthropic 对 AI 安全性的深入研究,并采用 “Constitutional AI” (宪法式 AI) 的训练方法,使其行为更符合人类价值观,并减少有害输出的可能性。 🛡️
|
Web App开发 数据采集 数据挖掘
还有这种骚操作:使用Golang实现无头浏览器浏览和截图
还有这种骚操作:使用Golang实现无头浏览器浏览和截图
1221 0
|
SQL 缓存 搜索推荐
Gorm学习(三)基础:迁移(数据库建表以及字段设置)
在项目开发中,我们可能会随时调整声明的模型,比如添加字段和索引,使用 GORM 的自动迁移功能,可以始终让我们的数据库表结构保持最新。
1832 0
Gorm学习(三)基础:迁移(数据库建表以及字段设置)
|
存储 SQL JSON
5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(一)
5、DataX(DataX简介、DataX架构原理、DataX部署、使用、同步MySQL数据到HDFS、同步HDFS数据到MySQL)(一)