import os
from pyflink.common import WatermarkStrategy, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.jdbc import JdbcSink
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, \
RollingPolicy
from pyflink.datastream.connectors.jdbc import JdbcConnectionOptions, JdbcExecutionOptions
from pyflink.table import TableEnvironment, EnvironmentSettings
class AddSuffixProcessFunction:
def process_element(value: str = ''):
return [value + ' -->']
# @logger.catch
def process_files(dir_input, out_db_conf):
# 创建 Flink 流执行环境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# jar
env.add_jars("file:///data/cw/wdproc/_data/jar/flink-connector-jdbc-3.2.0-1.19.jar")
env.add_jars("file:///data/cw/wdproc/_data/jar/postgresql-42.7.3.jar")
# 设置并行度
env.set_parallelism(4)
t_env = TableEnvironment.create(EnvironmentSettings.new_instance().in_batch_mode().build())
# 获取输入目录中的所有文件
input_files = [os.path.join(dir_input, f) for f in os.listdir(dir_input) if
os.path.isfile(os.path.join(dir_input, f))]
input_files = input_files[:4]
for input_file in input_files:
# 定义文件的输出路径
base_name = os.path.basename(input_file)
# 创建文件源
file_source = FileSource.for_record_stream_format(StreamFormat.text_line_format(),
input_file).process_static_file_set().build()
ds = env.from_source(
source=file_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source"
)
ds = ds.map(lambda x: AddSuffixProcessFunction.process_element(x), output_type=Types.ROW([Types.STRING()]))
# 定义JDBC Sink
jdbc_url = out_db_conf['url']
jdbc_table = out_db_conf['table']
jdbc_driver = out_db_conf['driver']
jdbc_user = out_db_conf['user']
jdbc_password = out_db_conf['password']
# 构建JdbcConnectionOptions
jdbc_connection_options = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \
.with_url(jdbc_url) \
.with_driver_name(jdbc_driver) \
.with_user_name(jdbc_user) \
.with_password(jdbc_password) \
.build()
# 构建JdbcExecutionOptions
jdbc_execution_options = JdbcExecutionOptions.builder() \
.with_batch_size(1000) \
.with_batch_interval_ms(200) \
.with_max_retries(3) \
.build()
# SQL语句
sql_dml_statement = "INSERT INTO " + jdbc_table + " (rowdata) VALUES (?)"
# 构建JdbcSink
jdbc_sink = JdbcSink.sink(
sql_dml_statement,
Types.ROW([Types.STRING()]), # 指定输出类型
jdbc_execution_options=jdbc_execution_options,
jdbc_connection_options=jdbc_connection_options,
)
# 将JdbcSink添加到数据流中
ds.sink_to(jdbc_sink)
# 执行流处理
env.execute("File Processing")
# 示例使用
dir_input = '/data/cw/wdproc/_data/testin'
dir_output = '/data/cw/wdproc/_data/testout'
out_db_conf = {
'url': 'jdbc:postgresql://localhost:5432/etlr1',
'table': 'td1',
'driver': 'org.postgresql.Driver',
'user': 'postgres',
'password': 'pgtest'
}
if __name__ == "__main__":
process_files(dir_input, out_db_conf)
出现以下错误:
python p0_flink/test_db.py
Traceback (most recent call last):
File "/data/cw/wdproc/p0_flink/test_db.py", line 111, in <module>
process_files(dir_input, out_db_conf)
File "/data/cw/wdproc/p0_flink/test_db.py", line 84, in process_files
jdbc_sink = JdbcSink.sink(
File "/data/cw/wdproc/.venv/lib/python3.9/site-packages/pyflink/datastream/connectors/jdbc.py", line 60, in sink
j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder',
File "/data/cw/wdproc/.venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/data/cw/wdproc/.venv/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
File "/data/cw/wdproc/.venv/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o124.getDeclaredMethod.
: java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder([I)
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
听说应该配置配套的连接器和驱动版本,但我只能找到
env.add_jars("file:///data/cw/wdproc/_data/jar/flink-connector-jdbc-3.2.0-1.19.jar")
env.add_jars("file:///data/cw/wdproc/_data/jar/postgresql-42.7.3.jar")
请原谅我java不熟悉 用的是默认的jdk11和python3.9
PyFlink 版本与 Flink 版本不兼容,或者缺少相应的依赖
更新 PyFlink:确保你使用的 PyFlink 版本与 Flink 版本相匹配
pip install apache-flink==1.19.0 # 确保版本号与你的 Flink 版本一致
添加依赖:确保所有必要的依赖都已经添加到环境中
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import JdbcSinkProvider
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars([
"file:///data/cw/wdproc/_data/jar/flink-connector-jdbc-3.2.0-1.19.jar",
"file:///data/cw/wdproc/_data/jar/postgresql-42.7.3.jar"
])
# 配置 JDBC 连接器
jdbc_sink = JdbcSink.sink(
"jdbc:postgresql://your-database-url",
"your_username",
"your_password",
"your_table",
JdbcSinkProvider(
drivername="org.postgresql.Driver",
dburl="jdbc:postgresql://your-database-url",
dbtable="your_table",
username="your_username",
password="your_password",
data_types=[DataTypes.STRING(), DataTypes.INT(), DataTypes.DOUBLE()]
)
)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。