请问如何在PYFLINK1.15下将connect steam进行sink? 我尝试用sink_to和add_sink都不行。 请提供具体方法和代码!
可以使用 DataStream.add_sink() 方法将数据流连接到 Sink 上。
参考这个示例代码:
from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
# 创建数据流
data_stream = env.from_collection([(1, "hello"), (2, "world")], Types.TUPLE([Types.INT(), Types.STRING()]))
# 创建 StreamingFileSink 对象
sink = StreamingFileSink \
.for_row_format('./output', SimpleStringEncoder('utf-8')) \
.with_rolling_policy(
file_size=1024 * 1024 * 128, # 文件大小
rollover_interval=1000 * 60, # 滚动间隔
inactivity_interval=1000) # 空闲间隔
# 连接数据流和 Sink
data_stream.add_sink(sink)
# 执行作业
env.execute("Write data stream to local file system")
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。