Flink SQL 自定义 redis connector

本文涉及的产品
云数据库 Redis 版,标准版 2GB
推荐场景:
搭建游戏排行榜
云原生内存数据库 Tair,内存型 2GB
实时计算 Flink 版,5000CU*H 3个月
简介: 一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector ?先来看一下官网的一张 connector 架构图:

一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector ?


先来看一下官网的一张 connector 架构图:



Metadata


Table API 和 SQL 都是声明式 API。这包括表的声明。因此,执行 CREATE TABLE 语句会导致目标目录中的元数据更新。对于大多数目录实现,不会为此类操作修改外部系统中的物理数据。特定于连接器的依赖项不必出现在类路径中。WITH 子句中声明的选项既不经过验证也不以其他方式解释。动态表的元数据(通过 DDL 创建或由目录提供)表示为 CatalogTable 的实例。必要时,表名将在内部解析为 CatalogTable。


Planning


在规划和优化表程序时,需要将 CatalogTable 解析为 DynamicTableSource(用于在 SELECT 查询中读取)和 DynamicTableSink(用于在 INSERT INTO 语句中写入)。


DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供特定于连接器的逻辑,用于将 CatalogTable 的元数据转换为 DynamicTableSource 和 DynamicTableSink 的实例。在大多数情况下,工厂的目的是验证选项(例如示例中的 'port' = '5022')、配置编码/解码格式(如果需要)以及创建表连接器的参数化实例。


默认情况下,使用 Java 的服务提供者接口 (SPI) 发现 DynamicTableSourceFactory 和 DynamicTableSinkFactory 的实例。连接器选项(例如示例中的 'connector' = 'custom')必须对应于有效的工厂标识符。


尽管在类命名中可能不明显,但 DynamicTableSource 和 DynamicTableSink 也可以被视为有状态的工厂,它们最终为读取/写入实际数据生成具体的运行时实现。


规划器使用源和接收器实例来执行特定于连接器的双向通信,直到找到最佳逻辑计划。根据可选声明的能力接口(例如 SupportsProjectionPushDown 或 SupportsOverwrite),规划器可能会对实例应用更改,从而改变生成的运行时实现。


Runtime


一旦逻辑规划完成,规划器将从表连接器获取运行时实现。运行时逻辑在 Flink 的核心连接器接口中实现,例如 InputFormat 或 SourceFunction。


这些接口按另一个抽象级别分组为 ScanRuntimeProvider、LookupRuntimeProvider 和 SinkRuntimeProvider 的子类。


例如,OutputFormatProvider(提供 org.apache.flink.api.common.io.OutputFormat)和 SinkFunctionProvider(提供 org.apache.flink.streaming.api.functions.sink.SinkFunction)都是 SinkRuntimeProvider 的具体实例,规划器可以 处理。


自定义 redis sink connector

大概需要下面 4 个过程:


自定义 Factory,根据需要实现 DynamicTableSourceFactory, DynamicTableSinkFactory.


自定义 TableSink, 实现 DynamicTableSink


定义 Options 也就是 connector 相关的属性


在 resource 下面添加配置文件 org.apache.flink.table.factories.Factory 里面添加 Factory 的全限定名


Factory

package flink.connector.redis;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
/**
 * 自定义 Factory 
 **/
public class RedisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ReadableConfig options = helper.getOptions();
        return new RedisDynamicTableSink(options);
    }
    /**
     * 这里没有实现 source 
     * @param context
     * @return
     */
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        return null;
    }
    @Override
    public String factoryIdentifier() {
        return "redis";
    }
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet();
        options.add(RedisOptions.HOST);
        options.add(RedisOptions.PORT);
        return options;
    }
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        final Set<ConfigOption<?>> options = new HashSet();
        options.add(RedisOptions.EXPIRE);
        return options;
    }
}


TableSink


package flink.connector.redis;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
import static flink.connector.redis.RedisOptions.*;
/**
 * 自定义 DynamicTableSink 
 **/
public class RedisDynamicTableSink implements DynamicTableSink {
    private ReadableConfig options;
    public RedisDynamicTableSink(ReadableConfig options) {
        this.options = options;
    }
    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.UPDATE_BEFORE)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .build();
    }
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        // 获取 redis 的 host 和 port
        String host = options.getOptional(HOST).get();
        Integer port = options.getOptional(PORT).get();
        Integer expire = options.getOptional(EXPIRE).get();
        MyRedisSink myRedisSink = new MyRedisSink(host, port, expire);
        return SinkFunctionProvider.of(myRedisSink);
    }
    @Override
    public DynamicTableSink copy() {
        return new RedisDynamicTableSink(this.options);
    }
    @Override
    public String asSummaryString() {
        return "redis table sink";
    }
}
package flink.connector.redis;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import redis.clients.jedis.Jedis;
/**
 * 自定义 sink 写入 redis
 **/
public class MyRedisSink extends RichSinkFunction<RowData> {
    private final String host;
    private final int port;
    private int expire;
    private Jedis jedis;
    public MyRedisSink(String host, int port, int expire) {
        this.host = host;
        this.port = port;
        this.expire = expire;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        this.jedis = new Jedis(host, port);
    }
    @Override
    public void invoke(RowData value, Context context) throws Exception {
        this.jedis.set(String.valueOf(value.getString(0)), String.valueOf(value.getInt(1)), "NX", "EX", expire);
    }
    @Override
    public void close() throws Exception {
        this.jedis.close();
    }
}


这里用的是 Jedis 没有使用 Flink 自带的 redis connector ,因为 Flink 自带的功能有限,很多功能都需要自己扩展,所以就直接使用 Jedis.我这里只是为了演示,只实现了最简单的 set 功能.


Options


package flink.connector.redis;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
/** 
 * Option utils for redis table source sink. 
 */ 
public class RedisOptions {
    private RedisOptions() {}
    public static final ConfigOption<String> HOST =
            ConfigOptions.key("host")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "The Redis table host.");
    public static final ConfigOption<Integer> PORT =
            ConfigOptions.key("port")
                    .intType()
                    .defaultValue(6379)
                    .withDescription(
                            "The Redis table port.");
    public static final ConfigOption<Integer> EXPIRE =
            ConfigOptions.key("expire")
                    .intType()
                    .noDefaultValue()
                    .withDescription(
                            "The Redis table expire time.");
}


所有 redis 相关的属性都可以在这里添加,比如用户名密码,连接池相关的配置等.


配置文件



最后也是最重要的一点就是在 resource 下面添加配置文件,因为 Flink 是通过 SPI 机制来发现工厂的,如果有不了解 SPI 机制的可以看前面这篇文章,注意这个路径一定不要写错.

到这里基本就完成了,下面来测试一下自定义的 connector 能否把数据准确的写入到 redis 里面.


使用&测试


// 定义数据源表
tEnv.executeSql("""
                  |CREATE TABLE datagen (
                  | f_sequence INT,
                  | f_random INT,
                  | f_random_str STRING,
                  | ts AS localtimestamp,
                  | WATERMARK FOR ts AS ts
                  |) WITH (
                  | 'connector' = 'datagen',
                  | -- optional options --
                  | 'rows-per-second'='1',
                  | 'fields.f_sequence.kind'='sequence',
                  | 'fields.f_sequence.start'='1',
                  | 'fields.f_sequence.end'='20',
                  | 'fields.f_random.min'='1',
                  | 'fields.f_random.max'='1000',
                  | 'fields.f_random_str.length'='10'
                  |)
                  |""".stripMargin)
// 定义 redis 表
                  tEnv.executeSql(
            """
              |create table redis_sink (
              |f1 STRING,
              |f2 INT
              |) WITH (
              |'connector' = 'redis',
              |'host' = 'xxx',
              |'port' = '6379',
              |'expire' = '100'
              |)
              |""".stripMargin)
// 执行插入 SQL
              tEnv.executeSql(
            """
              |insert into redis_sink
              |select f_random_str,f_random
              |from datagen
              |""".stripMargin)


上面的 datagen 会产生 20 条数据.执行上面的 SQL 然后查询一下 redis 打印的数据如下:


68652c3a52 : 396
de3044d6d0 : 248
b09690ec10 : 436
dab4bb9ea9 : 821
d57a47d883 : 134
4d3d23767a : 63
9ca712a25f : 527
cb3019326d : 164
4a4af63f89 : 803
3cb960dbf1 : 575
db95bf7590 : 500
4274665b4b : 910
5c27396cb1 : 993
c1d957a2c8 : 951
8b24d7abe2 : 66
817b59d742 : 354
baa51bb58a : 14
db32f9cd53 : 510
3c5db2220b : 44
7c169eaef9 : 160


通过上面的 Demo,相信大家对自定义 Flink SQL connector 已经有所了解,那在生产环境中就可以根据自己的需求去定制各种 connector 了.

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
12天前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
49 15
|
14天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
38 2
|
14天前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
29 1
|
26天前
|
SQL 资源调度 流计算
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
705 7
阿里云实时计算Flink在多行业的应用和实践
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
26天前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
1月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之怎么调整Flink Web U显示的日志行数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。