问题一:请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错
请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错 Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver class is
com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction 这是什么问题,可否使用Flink JDBC 从hudi 写入数据库呢?
参考回答:
报错信息中提到了 com.mysql.jdbc.Driver 这个类已经过时,应该使用 com.mysql.cj.jdbc.Driver 这个类。可以尝试更新 JDBC 驱动,使用新的 com.mysql.cj.jdbc.Driver 驱动类。
另外,报错信息中还提到了 java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction,这个错误可能是由于 Flink 版本不兼容导致的。建议检查 Flink 版本和相关依赖的版本是否匹配,尝试升级 Flink 版本或者降低相关依赖的版本。
关于使用 Flink JDBC 从 Hudi 写入数据库,是可以实现的。具体实现可以参考 Flink 官方文档中的示例代码,比如使用 JdbcOutputFormat 将数据写入数据库。在使用过程中,需要注意配置正确的 JDBC 连接信息和 SQL 语句,以及保证数据类型和表结构的兼容性。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/496537?spm=a2c6h.14164896.0.0.169063bfNaEsRK
问题二:请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'
请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'
参考回答:
应该是你参数哪里没写对。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492408?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc
问题三:各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'
各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'
参考回答:
看报错是由于 Flink SQL CDC 模块无法正确连接到 PostgreSQL 或 MySQL 数据库引起的。
检查下数据库连接信息、数据库不存在或表不存在、数据驱动、数据库访问权限等。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492444?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc
问题四:用代码执行flink sql 报错
错误:
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath
看意思是找到了两个一样的类:DynamicTableSinkFactory
代码如下: package org.apache.flink.examples;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.factories.DynamicTableSinkFactory;
public class CDC2ss2 { public static void main(String[] args) throws Exception { // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv; EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); tEnv = StreamTableEnvironment.create(env, settings); String src_sql = "CREATE TABLE userss (\n" + " user_id INT,\n" + " user_nm STRING\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '10.12.5.37',\n" + " 'port' = '3306',\n" + " 'username' = 'dps',\n" + " 'password' = 'dps1234',\n" + " 'database-name' = 'rpt',\n" + " 'table-name' = 'users'\n" + " )"; tEnv.executeSql(src_sql); // 创建表 String sink="CREATE TABLE sink (\n" + " user_id INT,\n" + " user_nm STRING,\n" + " primary key(user_id) NOT ENFORCED \n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" + " 'username' = 'dps',\n" + " 'password' = 'dps1234',\n" + " 'table-name' = 'sink'\n" + " )"; String to_print_sql="insert into sink select user_id ,user_nm from userss"; tEnv.executeSql(sink); tEnv.executeSql(to_print_sql); env.execute(); }
}
详细错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.
Table options are:
'connector'='jdbc'
'password'='dps1234'
'table-name'='sink'
'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
'username'='dps'
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 18 more
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.
Ambiguous factory classes are:
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 19 more
Process finished with exit code 1*来自志愿者整理的flink邮件归档
参考回答:
你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 JDCB
的 Connector?
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370089?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc
问题五:flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因?
flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因? file:///C:/Users/29243/AppData/Roaming/DingTalk/270913990_v2/resource_cache/7d/7da241e8d354dd345b1975ce966d7d2a.png
参考回答:
rmq社区的flink sql接口目前支持度不高,有很多bug。我提交的修复由于当前版本之前有几次没有review的合入导致master启动不了。用我这个仓库里面的这个版本吧。您可以参考一下这个链接,里面可能有您想要的答案:https://github.com/deemogsw/rocketMQ-flink-connector
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/488157?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc