Flink1.9新特性解读:通过Flink SQL查询Pulsar

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink1.9新特性解读:通过Flink SQL查询Pulsar

Flink1.9新增了很多的功能,其中一个对我们非常实用的特性通过Flink SQL查询Pulsar给大家介绍。

我们以前可能遇到过这样的问题。通过Spark读取Kafka,但是如果我们想查询kafka困难度有点大的,当然当前Spark也已经实现了可以通过Spark sql来查询kafka的数据。那么Flink 1.9又是如何实现通过Flink sql来查询Pulsar。

可能我们大多对kafka的比较熟悉的,但是对于Pulsar或许只是听说过,所以这里将Pulsar介绍下。

Pulsar简介


Pulsar由雅虎开发并开源的一个多租户、高可用,服务间的消息系统,目前是Apache软件基金会的孵化器项目。

Apache Pulsar是一个开源的分布式pub-sub消息系统,用于服务器到服务器消息传递的多租户,高性能解决方案,包括多个功能,例如Pulsar实例中对多个集群的本机支持,跨集群的消息的无缝geo-replication,非常低的发布和端到端 - 延迟,超过一百万个主题的无缝可扩展性,以及由Apache BookKeeper等提供的持久消息存储保证消息传递。

Pulsar已经在一些名企应用,比如腾讯用它类计费。而且它的扩展性是非常优秀的。下面是实际使用用户对他的认识。

和Puslar的原创团队做过一次挺深入交流,说说我的想法。我觉得Puslar是一个非常优秀的开源系统,它的整体框架偏向于HBase的设计,在其上实现了流数据的处理和服务。从与Kafka的对比上说,我个人对Kafka还是有比较深入的理解,Kafka也是很优秀的框架,给人一种非常纯粹和简洁的感觉。不过Puslar确实可以解决一些Kafka由于体系设计无法避免的痛点,最让我印象深刻的是Puslar的横向扩展能力要比Kafka好,因为Kafka的topic的性能扩展受限于partitions的个数,而Puslar是对数据分片,容易扩展。这对我们这种碰到大赛事需要扩展数倍系统吞吐能力的情景是很有用的。现在Puslar的框架都好了,缺的是整个生态,如监控,运维,管理,和其他平台和框架的对接,云服务的集成,丰富的客户端等等。

使用Flink sql 查询Pulsar流

Flink以前的版本并未真正实现查询Pulsar流,在Flink1.9版本中,由于阿里巴巴Blink对Flink存储库的贡献,使与Pulsar的集成更加强大。并且Flink1.9.0与Pulsar整合实现exactly-once流source和at-least-once流sink.

Pulsar作为Flink Catalog

通过集成可以将Pulsar注册为Flink Catalog【目录】,从而使在Pulsar流之上运行Flink查询只需几个命令即可。

这里补充下什么是Flink CatalogCatalog:所有对数据库和表的元数据信息都存放在Flink CataLog内部目录结构中,其存放了Flink内部所有与Table相关的元数据信息,包括表结构信息/数据源信息等。

Pulsar特点:


1.Pulsar中的数据schema与每个主题(topic)都相关联

2.生产者和消费者都发送带有预定义schema信息的数据

3.在兼容性检查中管理schema多版本化和演进

4.生产者和消费者是以POJO类方式发送和接受消息

下面是使用Struct模式创建生产者并发送消息

// Create producer with Struct schema and send messages
Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage()
  .value(User.builder()
    .userName(“pulsar-user”)
    .userId(1L)
    .build())
  .send();

使用Struct模式创建消费者并接收消息

// Create consumer with Struct schema and receive messages
Consumer<User> consumer = client.newCOnsumer(Schema.AVRO(User.class)).create();
consumer.receive();

Pulsar与Flink schema转换

Pulsar不仅能够处理和存储schema信息,而且还能够处理任何架构演变(必要时)。Pulsar将有效地管理broker中的任何schema 演变,在执行任何必要的兼容性检查的同时跟踪schema 的所有不同版本。

此外,当消息在生产者发布时,Pulsar将使用schema 版本标记每个消息,作为每个消息元数据的一部分。在消费者方面,当收到消息并反序列化元数据时,Pulsar将检查与此消息关联的schema 版本,并从broker中获取相应的schema信息。结果,当Pulsar与Flink应用程序集成时,它使用预先存在的schema信息,并将带有schema信息的单个消息映射到Flink的类型系统中的另一行。

对于Flink不直接与模式(schema)交互或不使用原始模式(例如,使用主题存储字符串或长数字)的情况,Pulsar会将消息有效负载转换为Flink行,称为“值”或-对于结构化模式类型(例如JSON和AVRO),Pulsar将从模式信息中提取各个字段,并将这些字段映射到Flink的类型系统。最后,与每个消息关联的所有元数据信息(例如消息键,主题,发布时间或事件时间)将转换为Flink行中的元数据字段。下面我们提供原始模式和结构化模式类型的示例,以及如何将它们从Pulsar主题(topic)转换为Flink的类型系统。

19e4001252cc30b51d1aa842a7eed59f.jpg

将所有schema信息映射到Flink的类型系统后,可以根据指定的schema信息开始在Flink中构建Pulsar源,接收器(sink)或目录(catalog ),如下所示:

Flink & Pulsar: 从Pulsar读取数据

为流查询创建Pulsar源

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("service.url", "pulsar://...")
props.setProperty("admin.url", "http://...")
props.setProperty("partitionDiscoveryIntervalMillis", "5000")
props.setProperty("startingOffsets", "earliest")
props.setProperty("topic", "test-source-topic")
val source = new FlinkPulsarSource(props)
// you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryable
val dataStream = env.addSource(source)(null)
// chain operations on dataStream of Row and sink the output
// end method chaining
env.execute()

在Pulsar注册topics 作为streaming表

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
  .connect(new Pulsar().properties(props))
  .inAppendMode()
  .registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()

Flink & Pulsar: 写数据到Pulsar

为流查询创建Pulsar sink

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = .....
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))
env.execute()

写streaming表到Pulsar

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
  .connect(new Pulsar().properties(props))
  .inAppendMode()
  .registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()

对于Flink开发人员只需要指定Flink如何连接到Pulsar集群,将Pulsar集群注册为Flink中的源,接收器或流表,不必担心任何schema注册表或序列化/反序列化操作。这可以简化处理和查询数据的方式。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
12天前
|
SQL NoSQL Java
Java使用sql查询mongodb
通过使用 MongoDB Connector for BI 和 JDBC,开发者可以在 Java 中使用 SQL 语法查询 MongoDB 数据库。这种方法对于熟悉 SQL 的团队非常有帮助,能够快速实现对 MongoDB 数据的操作。同时,也需要注意到这种方法的性能和功能限制,根据具体应用场景进行选择和优化。
44 9
|
1月前
|
SQL 存储 人工智能
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
Vanna 是一个开源的 Python RAG(Retrieval-Augmented Generation)框架,能够基于大型语言模型(LLMs)为数据库生成精确的 SQL 查询。Vanna 支持多种 LLMs、向量数据库和 SQL 数据库,提供高准确性查询,同时确保数据库内容安全私密,不外泄。
112 7
Vanna:开源 AI 检索生成框架,自动生成精确的 SQL 查询
|
19天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
95 14
|
2月前
|
SQL Java
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
39 8
|
2月前
|
SQL 安全 PHP
PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全
本文深入探讨了PHP开发中防止SQL注入的方法,包括使用参数化查询、对用户输入进行过滤和验证、使用安全的框架和库等,旨在帮助开发者有效应对SQL注入这一常见安全威胁,保障应用安全。
68 4
|
2月前
|
SQL 监控 关系型数据库
SQL语句当前及历史信息查询-performance schema的使用
本文介绍了如何使用MySQL的Performance Schema来获取SQL语句的当前和历史执行信息。Performance Schema默认在MySQL 8.0中启用,可以通过查询相关表来获取详细的SQL执行信息,包括当前执行的SQL、历史执行记录和统计汇总信息,从而快速定位和解决性能瓶颈。
|
2月前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
177 10
|
2月前
|
SQL 关系型数据库 MySQL
|
3月前
|
SQL 数据库 开发者
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
|
2月前
|
SQL 关系型数据库 MySQL
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
37 0