datastream有pulsar的连接器,sql我在maven仓库也可以搜到,都可以使用。此答案整理自钉群“Flink CDC 社区”
Pulsar是一种分布式的流处理系统,而Flink也是一种流处理系统,它们可以通过Pulsar Flink Connector来连接起来。
具体来说,你需要使用Pulsar Flink Connector,该连接器提供了将Pulsar作为Flink源或Flink汇聚的功能。使用该连接器,你可以轻松地从Pulsar主题中读取数据或将Flink处理结果写入Pulsar主题。
接下来是连接器的使用步骤:
在pom.xml文件中添加Pulsar Flink Connector的依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
PulsarSourceBuilder<String> builder = PulsarSource.builder(pulsarServiceUrl, StringSchema.of());
PulsarSource<String> source = builder.topic(topicName).subscriptionName(subName).build();
DataStream<String> stream = env.addSource(source, "Pulsar Source");
PulsarOptions pulsarOptions = PulsarOptions.builder()
.serviceUrl(pulsarServiceUrl)
.topic(topicName)
.build();
DataStreamSource<String> input = env.readTextFile(inputPath);
input.addSink(new PulsarSink<>(pulsarOptions, new SimpleStringSchema()));
以上三个步骤将Pulsar与Flink连接起来,你可以在Flink应用程序中使用Pulsar数据源或将输出写入Pulsar主题中。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。