开发者社区> 问答> 正文

Pyflink jdbc相关有问题

本地执行:

1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的

flink run:

1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。

> 2021年6月1日 下午4:33,琴师 <1129656513@qq.com> 写道:

>

> Hi,

>    我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1

> 我的原代码如下:

>

>

> from pyflink.datastream import StreamExecutionEnvironment

> from pyflink.table import StreamTableEnvironment, EnvironmentSettings

> env = StreamExecutionEnvironment.get_execution_environment()

> table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

> table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")

>

>

> # 2. create source Table

> table_env.execute_sql("""

>

>

> CREATE TABLE table_source (

>   e string

> ) WITH (

>  'connector' = 'jdbc',

>   'url' = 'jdbc:mysql://********:3306/test',

>   'driver' = 'com.mysql.cj.jdbc.Driver',

>   'table-name' = 'enum_test',

>   'username' = 'pms_etl',

>   'password' = 'pms_etl_q'

> )

>

>

> """)

>

>

> # 3. create sink Table

> table_env.execute_sql("""

>     CREATE TABLE print (

>         e string

>     ) WITH (

>         'connector' = 'print'

>     )

> """)

>    

>

>

> table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()

>

>

>

> 我直接用python执行时候错误返回如下

>

>

> Traceback (most recent call last):

>   File "demo.py", line 41, in <module>

>     table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()

>   File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql

>     return TableResult(self._j_tenv.executeSql(stmt))

>   File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in call

>     answer, self.gateway_client, self.target_id, self.name)

>   File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco

>     return f(*a, **kw)

>   File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value

>     format(target_id, ".", name), value)

> py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.

> : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'.

>

>

> Table options are:

>

>

> 'connector'='jdbc'

> 'driver'='com.mysql.cj.jdbc.Driver'

> 'password'='pms_etl_q'

> 'table-name'='enum_test'

> 'url'='jdbc:mysql://*******:3306/test'

> 'username'='pms_etl'

>         at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)

>         at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)

>         at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)

>         at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)

>         at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)

>         at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)

>         at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)

>         at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)

>         at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)

>         at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)

>         at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)

>         at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)

>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)

>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)

>         at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)

>         at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)

>         at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)

>         at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)

>         at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)

>         at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)

>         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)

>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

>         at java.lang.reflect.Method.invoke(Method.java:498)

>         at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

>         at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

>         at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

>         at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

>         at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

>         at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

>         at java.lang.Thread.run(Thread.java:748)

> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'

>         at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)

>         at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)

>         at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)

>         ... 31 more

> Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

>

>

> Available factory identifiers are:

>

>

> blackhole

> datagen

> filesystem

> print

>         at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)

>         at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)

>         ... 33 more

>

>

>

> 我用flink run -py demo.py 返回错误如下:

>

>

>   File "./demo.py", line 41, in <module>

>     table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait()

>   File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql

>   File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in call

>   File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco

> pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink default_catalog.default_database.table_sink does not exists

>         at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233)

>         at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)

>         at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

>         at scala.collection.Iterator.foreach(Iterator.scala:937)

>         at scala.collection.Iterator.foreach$(Iterator.scala:937)

>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)

>         at scala.collection.IterableLike.foreach(IterableLike.scala:70)

>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69)

>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

>         at scala.collection.TraversableLike.map(TraversableLike.scala:233)

>         at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)

>         at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)

>         at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)

>         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)

>         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)

>         at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)

>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

>         at java.lang.reflect.Method.invoke(Method.java:498)

>         at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

>         at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

>         at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

>         at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

>         at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

>         at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

>         at java.lang.Thread.run(Thread.java:748)

>

>

> org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1

>         at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)

>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

>         at java.lang.reflect.Method.invoke(Method.java:498)

>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

>         at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)

>         at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

>         at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

>         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

>         at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

>         at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

>         at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

>         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

> Caused by: java.lang.RuntimeException: Python process exits with code: 1

>         at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)

>         ... 13 more

>

>

>

> 请问我该如何解决?*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-02 14:44:31 851 0
1 条回答
写回答
取消 提交回答
  • 这样试试,把”\”改成”/“:

    file:///D:/Pyproject/flink-connector-jdbc_2.11-1.13.1.jar*来自志愿者整理的FLINK邮件归档

    2021-12-02 14:57:47
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载