Flink数据问题之连接mysql无数据输出如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

主要是为了实现解析自定义的schema,sink端好输出到下游。

想请教一个问题:

https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job

看了上面这个链接关于为每个作业设置一个differnet server id的问题。我看sql可以指定不同的server id,所以有下面这三个疑惑:

1、 如果是不同的stream 任务 的它的server id是不是同一个?

2、不同的stream 任务 同步同一个数据库的不同表是不是没有问题

3、不同的stream 任务 同步同一个数据库的同一张表是不是有问题

*来自志愿者整理的flink邮件归档



参考答案:

See the docs: https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370018?spm=a2c6h.13066369.question.83.33bf585fwBnaSf



问题二:flink sql 连接mysql 无数据输出

   我在Idea里用flink-jdbc-connector连接mysql, 建完表后执行env.executeSql("select * from my_table").print()方法,只打印了表头,没有数据是什么原因? flink版本1.11.2

*来自志愿者整理的flink邮件归档



参考答案:

是不是没有加这一行代码,tableEnv.execute("test");

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370017?spm=a2c6h.13066369.question.84.33bf585fAxGR1X



问题三:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问

我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink 1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?     我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;

> >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >public class Test1 { > >    public static void main(String[] args) { >        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > >        String lTable = "CREATE TABLE l_table (  " + >                " l_a INT,  " + >                " l_b string,  " + >                " l_rt AS localtimestamp,  " + >                " WATERMARK FOR l_rt AS l_rt  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.l_a.min'='1',  " + >                " 'fields.l_a.max'='5',  " + >                " 'fields.l_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(lTable); > >        String rTable = "CREATE TABLE r_table (  " + >                " r_a INT,  " + >                " r_b string,  " + >                " r_pt AS proctime()  " + >                ") WITH (  " + >                " 'connector' = 'datagen',  " + >                " 'rows-per-second'='5',  " + >                " 'fields.r_a.min'='1',  " + >                " 'fields.r_a.max'='5',  " + >                " 'fields.r_b.length'='5'  " + >                ")"; >        bsTableEnv.executeSql(rTable); > >        String printTable = "CREATE TABLE print (" + >                "  l_a INT,  " + >                "  l_b string,  " + >                "  l_rt timestamp(3),  " + >                "  r_a INT,  " + >                "  r_b string,  " + >                "  r_pt timestamp(3)  " + >                ") WITH (  " + >                " 'connector' = 'print' " + >                ") "; > >        bsTableEnv.executeSql(printTable); > >        // 运行成功 >//        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt = r_table.r_pt"); > >        // 运行错误,提示Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. >        Table joinTable = bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND"); > >        bsTableEnv.executeSql("insert into print select * from " + joinTable); > >    } > >}

*来自志愿者整理的flink邮件归档



参考答案:

 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。

  而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。

  Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370016?spm=a2c6h.13066369.question.85.33bf585fSSKbxa



问题四:slot问题

          一个slot同时只能运行一个线程吗?或者1个slot可以同时并行运行多个线程?

*来自志愿者整理的flink邮件归档



参考答案:

有啊,一个slot本身就可以运行多个线程的。但是不可以运行1个算子结点的多个任务,也不可以运行多个作业中的算子结点的多个任务。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370015?spm=a2c6h.13066369.question.84.33bf585f3XoJwI



问题五:flink sql时间戳字段类型转换问题

数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit

中的kafka消息,里面user_behavior消息例如

{"user_id": "470572", "item_id":"3760258", "category_id": "1299190",

"behavior": "pv", "ts": "2017-11-26T01:00:01Z"}

可以看到ts值是 '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下

CREATE TABLE user_log (

user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL' 'scan.startup.mode' = 'earliest-offset' );

程序运行会抛错

Caused by: java.time.format.DateTimeParseException: Text '2017-11-26T01:00:00Z' could not be parsed at index 10

我查了一下flink json官方文档 https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard

目前只支持两种格式:SQL 和 ISO-8601

其中SQL支持的格式是 'yyyy-MM-dd HH:mm:ss',

而ISO-8601支持的格式是 'yyyy-MM-ddTHH:mm:ss.s{precision}'

确实不支持上面的 'yyyy-MM-ddTHH:mm:ssZ' (注意末尾的Z)

请问:

  1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
  2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,

pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string

里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',

'yyyy-MM-dd'T'HH:mm:ss'Z'')?

  1. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME

ZONE这两种类型在什么情况下会用到?有例子吗?

谢谢!

*来自志愿者整理的flink邮件归档



参考答案:

你可以用这篇文章中的 docker: https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml

这个容器里面的 ts 数据格式是 SQL 格式的。

  1. 像上述时间格式字段在Flink SQL中应该解析成什么类型? TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的 json formart 才支持。
  2. 是的
  3. Flink 目前还不支持 TIMESTAMP WITH TIME ZONE。 'yyyy-MM-dd HH:mm:ss' 这种,对应的是 TIMESTAMP,代表无时区 timestamp long 值,或者 'yyyy-MM-dd HH:mm:ssZ' 这种是TIMESTAMP WITH LOCAL TIME ZONE ,代表session 时区的 timestamp

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370014?spm=a2c6h.13066369.question.87.33bf585fd5RWZf

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
9月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1190 43
|
9月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
573 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
8月前
|
SQL Java 关系型数据库
Java连接MySQL数据库环境设置指南
请注意,在实际部署时应该避免将敏感信息(如用户名和密码)硬编码在源码文件里面;应该使用配置文件或者环境变量等更为安全可靠地方式管理这些信息。此外,在处理大量数据时考虑使用PreparedStatement而不是Statement可以提高性能并防止SQL注入攻击;同时也要注意正确处理异常情况,并且确保所有打开过得资源都被正确关闭释放掉以防止内存泄漏等问题发生。
390 13
|
8月前
|
SQL 关系型数据库 MySQL
MySQL数据库连接过多(Too many connections)错误处理策略
综上所述,“Too many connections”错误处理策略涉及从具体参数配置到代码层面再到系统与架构设计全方位考量与改进。每项措施都需根据具体环境进行定制化调整,并且在执行任何变更前建议先行测试评估可能带来影响。
1750 11
|
8月前
|
SQL 监控 关系型数据库
查寻MySQL或SQL Server的连接数,并配置超时时间和最大连接量
以上步骤提供了直观、实用且易于理解且执行的指导方针来监管和优化数据库服务器配置。务必记得,在做任何重要变更前备份相关配置文件,并确保理解每个参数对系统性能可能产生影响后再做出调节。
811 11
|
8月前
|
SQL 关系型数据库 MySQL
排除通过IP访问MySQL时出现的连接错误问题
以上步骤涵盖了大多数遇到远程连接 MySQL 数据库时出现故障情形下所需采取措施,在执行每个步骤后都应该重新尝试建立链接以验证是否已经解决问题,在多数情形下按照以上顺序执行将能够有效地排除并修复大多数基本链接相关故障。
541 3
|
9月前
|
存储 关系型数据库 MySQL
修复.net Framework4.x连接MYSQL时遇到utf8mb3字符集不支持错误方案。
通过上述步骤大多数情况下能够解决由于UTF-encoding相关错误所带来影响,在实施过程当中要注意备份重要信息以防止意外发生造成无法挽回损失,并且逐一排查确认具体原因以采取针对性措施解除障碍。
629 12
|
9月前
|
存储 关系型数据库 MySQL
在CentOS 8.x上安装Percona Xtrabackup工具备份MySQL数据步骤。
以上就是在CentOS8.x上通过Perconaxtabbackup工具对Mysql进行高效率、高可靠性、无锁定影响地实现在线快速全量及增加式数据库资料保存与恢复流程。通过以上流程可以有效地将Mysql相关资料按需求完成定期或不定期地保存与灾难恢复需求。
711 10
|
分布式计算 关系型数据库 MySQL
E-Mapreduce如何处理RDS的数据
目前网站的一些业务数据存在了数据库中,这些数据往往需要做进一步的分析,如:需要跟一些日志数据关联分析,或者需要进行一些如机器学习的分析。在阿里云上,目前E-Mapreduce可以满足这类进一步分析的需求。
5218 0
|
8月前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
536 158

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多