开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问有人知道pulsar和flink怎么对接起来吗?

请问有人知道pulsar和flink怎么对接起来吗?

展开
收起
十一0204 2023-04-10 20:45:19 176 0
2 条回答
写回答
取消 提交回答
  • 意中人就是我呀!

    datastream有pulsar的连接器,sql我在maven仓库也可以搜到,都可以使用。此答案整理自钉群“Flink CDC 社区”

    2023-04-12 08:48:50
    赞同 展开评论 打赏
  • 坚持这件事孤独又漫长。

    Pulsar是一种分布式的流处理系统,而Flink也是一种流处理系统,它们可以通过Pulsar Flink Connector来连接起来。

    • 具体来说,你需要使用Pulsar Flink Connector,该连接器提供了将Pulsar作为Flink源或Flink汇聚的功能。使用该连接器,你可以轻松地从Pulsar主题中读取数据或将Flink处理结果写入Pulsar主题。

    • 接下来是连接器的使用步骤:

    • 在pom.xml文件中添加Pulsar Flink Connector的依赖项:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    1. 在Flink的源代码中,使用Pulsar输入流:
    PulsarSourceBuilder<String> builder = PulsarSource.builder(pulsarServiceUrl, StringSchema.of());
    PulsarSource<String> source = builder.topic(topicName).subscriptionName(subName).build();
    DataStream<String> stream = env.addSource(source, "Pulsar Source");
    
    1. 在Flink代码中,使用Pulsar输出流:
    PulsarOptions pulsarOptions = PulsarOptions.builder()
                .serviceUrl(pulsarServiceUrl)
                .topic(topicName)
                .build();
    DataStreamSource<String> input = env.readTextFile(inputPath);
    input.addSink(new PulsarSink<>(pulsarOptions, new SimpleStringSchema()));
    

    以上三个步骤将Pulsar与Flink连接起来,你可以在Flink应用程序中使用Pulsar数据源或将输出写入Pulsar主题中。

    2023-04-11 08:47:15
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载