Flink查询问题之每秒入库到mysql数量很少如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink sql 读取mysql

版本:flink 1.10 mysql 5.7.24

需求场景是: 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?

现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.

望大神们指点方向,提前谢谢了.*来自志愿者整理的flink邮件归档



参考答案:

这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371510?spm=a2c6h.12873639.article-detail.22.29d04378ApxdqJ



问题二:Apache Flink常见问题汇总【精品问答】

hi

我这面在使用sql api解析kafka

json数据,在创建表的时候发现json数据解析的时候有下面两项,这两项如果开启那么解析失败的数据是会被丢掉吗,有没有方式可以把解析失败的数据打到外部存储

json.ignore-parse-errors

son.fail-on-missing-field*来自志愿者整理的flink邮件归档



参考答案:

我理解应该做不到,因为这两个format参数在format里就做的。

json.ignore-parse-errors 是在 format解析时跳过解析失败的数据继续解析下一行,json.fail-on-missing-field 是标记如果字段少时是否失败还是继续(缺少的字段用null补上)

这两个不能同时为ture,语义上就是互斥的。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371508?spm=a2c6h.12873639.article-detail.23.29d04378ApxdqJ



问题三:https://developer.aliyun.com/ask/371504

请问实现了 MetricReporter, CharacterFilter,Scheduled, Reporter 的自定义

customerReporter 如何能在 代码env里面注册并实现metric上报,要求不在flink conf.xml 文件里面配置

该customerReporter的信息?

需求:在自定义的source 和sink等算子里面计算处理成功,失败的数据条数并通过自定义reporter上报,并且该reporter需要是通用型的即

*适用于多个flink

任务*从而避开重复造轮子。*来自志愿者整理的flink邮件归档



参考答案:

尝试理解一下你的需求,你自己实现了一个 report,也希望在 source 和 sink 中计算一些 metric,希望把 source 和sink 的这些 metric 通过自定义的 report 上报到你指定的地方。然后不希望在 env 里面配置 report 的信息,是这样吗?能否解释下为什么不希望在 flink-conf 中进行配置,而是希望在 env 中进行配置吗*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371506?spm=a2c6h.12873639.article-detail.24.29d04378ApxdqJ



问题四:flink1.11查询结果每秒入库到mysql数量很少

各位大佬好,请教一个问题,在使用flink1.11消费kafka数据,查询结果写入到mysql库表时,发现读取kafka的速度很快(300条/秒),但是查询结果每秒写入mysql的条数只有6条左右,请问这是怎么回事,以及优化的点在哪里?下面是我的代码。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

source_Kafka = """

CREATE TABLE kafka_source (

id VARCHAR,

alarm_id VARCHAR,

trck_id VARCHAR

) WITH (

'connector' = 'kafka',

'topic' = 'test',  

'properties.bootstrap.servers' = '*',

'properties.group.id' = 'flink_grouper',

'scan.startup.mode' = 'earliest-offset',    

'format' = 'json',

'json.fail-on-missing-field' = 'false',

'json.ignore-parse-errors' = 'true'

)

"""

source_W_detail_ddl = """

CREATE TABLE source_W_detail (

id VARCHAR,    

alarm_id VARCHAR,    

trck_id VARCHAR    

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',

'driver' = 'com.mysql.cj.jdbc.Driver',

'table-name' = 'detail',

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.max-rows' = '1000',

'sink.buffer-flush.interval' = '2s'

"""

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

t_env.execute_sql(source_Kafka)

t_env.execute_sql(source_W_detail_ddl)

table_result1=t_env.execute_sql('''insert into source_W_detail select id,alarm_id,trck_id from kafka_source''')

table_result1.get_job_client().get_job_execution_result().result()*来自志愿者整理的flink邮件归档



参考答案:

你可以尝试改写url,加上rewritebatchedstatements=true,如下: jdbc:mysql://198.2.2.71:3306/bda?useSSL=false&rewritebatchedstatements=true

MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371504?spm=a2c6h.12873639.article-detail.26.29d04378ApxdqJ

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
SQL 缓存 监控
MySQL缓存机制:查询缓存与缓冲池优化
MySQL缓存机制是提升数据库性能的关键。本文深入解析了MySQL的缓存体系,包括已弃用的查询缓存和核心的InnoDB缓冲池,帮助理解缓存优化原理。通过合理配置,可显著提升数据库性能,甚至达到10倍以上的效果。
|
5月前
|
SQL 存储 关系型数据库
MySQL体系结构详解:一条SQL查询的旅程
本文深入解析MySQL内部架构,从SQL查询的执行流程到性能优化技巧,涵盖连接建立、查询处理、执行阶段及存储引擎工作机制,帮助开发者理解MySQL运行原理并提升数据库性能。
|
5月前
|
SQL 关系型数据库 MySQL
MySQL的查询操作语法要点
储存过程(Stored Procedures) 和 函数(Functions) : 储存过程和函数允许用户编写 SQL 脚本执行复杂任务.
266 14
|
5月前
|
SQL 关系型数据库 MySQL
MySQL的查询操作语法要点
以上概述了MySQL 中常见且重要 的几种 SQL 查询及其相关概念 这些知识点对任何希望有效利用 MySQL 进行数据库管理工作者都至关重要
152 15
|
5月前
|
SQL 监控 关系型数据库
SQL优化技巧:让MySQL查询快人一步
本文深入解析了MySQL查询优化的核心技巧,涵盖索引设计、查询重写、分页优化、批量操作、数据类型优化及性能监控等方面,帮助开发者显著提升数据库性能,解决慢查询问题,适用于高并发与大数据场景。
|
5月前
|
SQL 关系型数据库 MySQL
MySQL入门指南:从安装到第一个查询
本文为MySQL数据库入门指南,内容涵盖从安装配置到基础操作与SQL语法的详细教程。文章首先介绍在Windows、macOS和Linux系统中安装MySQL的步骤,并指导进行初始配置和安全设置。随后讲解数据库和表的创建与管理,包括表结构设计、字段定义和约束设置。接着系统介绍SQL语句的基本操作,如插入、查询、更新和删除数据。此外,文章还涉及高级查询技巧,包括多表连接、聚合函数和子查询的应用。通过实战案例,帮助读者掌握复杂查询与数据修改。最后附有常见问题解答和实用技巧,如数据导入导出和常用函数使用。适合初学者快速入门MySQL数据库,助力数据库技能提升。
|
5月前
|
SQL 监控 关系型数据库
MySQL高级查询技巧:子查询、联接与集合操作
本文深入解析了MySQL高级查询的核心技术,包括子查询、联接和集合操作,通过实际业务场景展示了其语法、性能差异和适用场景,并提供大量可复用的代码示例,助你从SQL新手进阶为数据操作高手。
|
5月前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
444 158
|
5月前
|
关系型数据库 MySQL 数据库
自建数据库如何迁移至RDS MySQL实例
数据库迁移是一项复杂且耗时的工程,需考虑数据安全、完整性及业务中断影响。使用阿里云数据传输服务DTS,可快速、平滑完成迁移任务,将应用停机时间降至分钟级。您还可通过全量备份自建数据库并恢复至RDS MySQL实例,实现间接迁移上云。
|
5月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。
1040 152

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多