开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

有没有flink sink到hudi的demo

有没有flink sink到hudi的demo

展开
收起
游客6vdkhpqtie2h2 2022-09-15 10:45:01 661 0
12 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    以下是一个简单的 Flink Sink 到 Hudi 的 Demo 示例:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
    
    import org.apache.hudi.configuration.FlinkOptions;
    import org.apache.hudi.streamer.FlinkStreamer;
    import org.apache.hudi.streamer.FlinkStreamerConfig;
    import org.apache.hudi.streamer.Operation;
    
    import org.apache.flink.table.api.EnvironmentSettings;
    
    public class FlinkHudiDemo {
    
        public static void main(String[] args) throws Exception {
            // 定义 Flink Execution Environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 定义 StreamTableEnvironment
            EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
    
            // 定义测试数据
            String[] input = new String[]{"id-1,John,26,Male", "id-2,Doe,30,Male"};
    
            // 构造测试数据,转换为 Table 类型
            Table inputTable = env.fromElements(input)
                .map((MapFunction<String, Row>) value -> {
                    String[] values = value.split(",");
                    return Row.of(values[0], values[1], Integer.parseInt(values[2]), values[3]);
                })
                .toTable(tableEnv, DataTypes.ROW(DataTypes.FIELD("id", DataTypes.STRING()),
                    DataTypes.FIELD("name", DataTypes.STRING()),
                    DataTypes.FIELD("age", DataTypes.INT()),
                    DataTypes.FIELD("gender", DataTypes.STRING())));
    
            // 定义 Hudi 配置参数
            BatchWriteConfig hoodieCfg = HoodieWriteConfig.newBuilder().withPath("hdfs://your_hdfs_path")
                .withSchema(HoodieAvroSchemaConverter.convertToStructType(schema)).build();
    
            // 定义 Flink save point 路径
            String savepointPath = "hdfs://your_savepoint_path";
    
            // 定义 Flink Streamer 配置参数
            FlinkStreamerConfig cfg = FlinkStreamerConfig.builder()
                .withOperation(Operation.UPSERT)
                .withTable("test_table")
                .withTargetBasePath("hdfs://your_target_path")
                .withWriteDefaults(false)
                .withSchema(KafkaJsonSerDeUtils.generateSchema(objectMapper, schema).toString())
                .withKeyGeneratorClass("org.apache.hudi.keygen.SimpleKeyGenerator")
                .withHoodieConfigs(hoodieCfg.getProps())
                .withCheckpointInterval(1000 * 10) // 设置checkpoint 间隔时间
                .withPrecombineField("id")
                .withRowKey("id")
                .withMetricsPrefix("test")
                .withTransformerClass("org.apache.hudi.utilities.transform.SqlQueryBasedTransformer")
                .withTransformConfigPath("path/to/configs")
                .withBootstrapBasePath("hdfs://your_bootstrap_path")
                .withFilterDupes(false)
                .withSavePointPath(savepointPath)
                .build();
    
            // 将数据写入到 Hudi 表中
            FlinkStreamer streamer = new FlinkStreamer(cfg, env);
            streamer.sync();
    
            // 执行 flink 程序
            env.execute("FlinkHudiDemo");
        }
    }
    
    2023-05-05 21:02:51
    赞同 展开评论 打赏
  • 是的,Flink 提供了将数据流推送到 Hudi 的功能,并且提供了对应的 sink。以下是一个简单的 Flink Sink 到 Hudi 的示例,供您参考:

    DataStream<Tuple2<Boolean, Row>> dataStream = ...; // 从某个数据源获取一个流
    
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 创建一个流表环境
    
    Table table = tableEnv.fromDataStream(dataStream, "id,name,age"); // 将数据流转换为 Table
    tableEnv.createTemporaryView("temp_table", table); 
    
    HoodieConfig hoodieConfig = HoodieWriterConfig.newBuilder()
        .withPath("path/to/hudi/table")
        .withPreCombineField("timestamp")
        .withSchema(TABLE_SCHEMA)
        .withParallelism(1)
        .forTable(TABLE_NAME)
        .build();
    
    HoodieFlinkTableSink sink = new HoodieFlinkTableSink(hoodieConfig, LIMITED_CONCURRENT_REQUESTS, 3);
    
    tableEnv.registerTableSink("hudi_sink", new String[]{"id","name","age"}, new TypeInformation[]{Types.STRING, Types.STRING, Types.INT}, sink);
    
    tableEnv.sqlUpdate("insert into hudi_sink select id, name, age from temp_table"); // 执行插入操作 
    

    在这个示例中,我们首先使用 StreamTableEnvironment 创建一个流表环境,将 DataStream 转换为 Table 并注册为临时表。接下来,我们创建了一个 HoodieConfig 对象并使用它实例化了一个 HoodieFlinkTableSink 对象,最后将其注册为表格 “hudi_sink”。我们还通过将 temp_table 插入 hudi_sink 表格来执行 Hudi Sink 操作。

    需要注意的是,示例中的代码片段只是一个简单的示例,并不是完整的可运行程序。您需要根据实际情况进行适当的修改,并根据您选择的 Hudi 版本添加相应的依赖关系。

    2023-05-05 17:54:25
    赞同 展开评论 打赏
  • 下面是一个简单的示例,在Flink中使用Hudi Sink将数据写入到Hudi的表中。 首先,需要引入相关的依赖:

    <dependency>
      <groupId>org.apache.hudi</groupId>
      <artifactId>hudi-flink</artifactId>
      <version>0.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.13.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>1.13.1</version>
    </dependency>
    

    接下来,可以使用以下示例代码:

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.hudi.flink.HoodieFlinkWriteConfig;
    import org.apache.hudi.flink.HoodieWriteConfig;
    import org.apache.hudi.flink.HoodieWriteHandle;
    import org.apache.hudi.flink.HoodieFlinkTableSink;
    import org.apache.hudi.flink.HoodieFlinkTableFactory;
    import org.apache.hudi.hadoop.config.HoodieWriteConfig;
    import org.apache.hadoop.conf.Configuration;
    import java.util.Properties;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.configuration.FlinkOptions;
    import org.apache.hudi.exception.HoodieException;
    import java.io.IOException;
    
    public class FlinkHudiSinkDemo {
    
        public static void main(String[] args) throws Exception {
            // 创建Flink执行环境
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            // 定义输入流
            DataStream<String> input = env.addSource(new MySourceFunction());
    
            // 定义Hudi Sink
            HoodieFlinkWriteConfig writeConfig = makeConfig();
            HoodieFlinkTableSink sink = (HoodieFlinkTableSink) HoodieFlinkTableFactory.create(
                    // 设置生成Hudi表的路径和名称
                    "./path/to/hudi/table",
                    // 设置表类型为COPY_ON_WRITE或MERGE_ON_READ
                    HoodieTableType.COPY_ON_WRITE, 
                    // 设置write配置
                    writeConfig);
            input.addSink(sink);
    
            // 执行任务
            env.execute("Flink Hudi Sink Demo");
        }
    
        private static HoodieFlinkWriteConfig makeConfig() {
            Properties props = new Properties();
    
            // 设置Hadoop配置(如DFS配置)
            Configuration hadoopConf = new Configuration();
            props.put(FlinkOptions.HADOOP_CONF, hadoopConf);
    
            // 设置表类型
            props.put(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
    
            // 设置写入模式
            props.put(HoodieWriteConfig.WRITE_OPERATION_OPT_KEY(), HoodieWriteConfig.INSERT_OPERATION);
            
            // 设置写入批次大小
            props.put(HoodieWriteConfig.BULK_INSERT_BATCH_SIZE_OPT_KEY(), "1000");
            
            // 设置日期时间格式
            props.put(FlinkOptions.WRITE_DATE_FORMAT, "yyyyMMdd");
    
            // 设置表名称
            props.put(FlinkOptions.TABLE_NAME, "hudi_test");
    
            // 构造Hoodie Flink写入配置对象
            HoodieFlinkWriteConfig writeConfig = HoodieFlinkWriteConfig.newBuilder()
                    .withSchema("")
                    .withProps(props)
                    .withParallelism(1)
                    .withBulkInsertParallelism(1)
                    .build();
    
            return writeConfig;
    
        }
    
        private static final class MySourceFunction implements SourceFunction<String> {
    
            private volatile boolean isRunning = true;
    
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                // 读取数据源
                while (isRunning) {
                    String message = null; // 读取数据
                    if (message != null) {
                        ctx.collect(message);
                    }
                }
            }
    
            @Override
            public void cancel() {
                isRunning = false;
            }
        }
    }
    
    
    2023-05-03 07:50:21
    赞同 展开评论 打赏
  • 以下是一个Flink Sink到Hudi的demo:

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.util.Collector;
    import org.apache.hadoop.fs.Path;
    import org.apache.hudi.DataSourceUtils;
    import org.apache.hudi.DataSourceWriteOptions;
    import org.apache.hudi.HoodieFlinkWriteableTable;
    import org.apache.hudi.WriteStatus;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.common.util.FSUtils;
    import org.apache.hudi.config.HoodieWriteConfig;
    import org.apache.hudi.hadoop.config.HoodieBootstrapConfig;
    import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
    import org.apache.hudi.utilities.HoodieFlinkStreamer;
    import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
    import org.apache.hudi.utilities.sources.JsonKafkaSource;
    
    import java.sql.Timestamp;
    import java.util.List;
    import java.util.Properties;
    
    public class FlinkSinkToHudiDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.enableCheckpointing(5000);
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9092");
            props.setProperty("group.id", "test");
            FlinkKafkaConsumer&lt;String&gt; consumer = new FlinkKafkaConsumer&lt;&gt;("test", new SimpleStringSchema(), props);
            env.addSource(consumer)
                    .flatMap(new FlatMapFunction&lt;String, String&gt;() {
                        @Override
                        public void flatMap(String s, Collector&lt;String&gt; collector) throws Exception {
                            collector.collect(s);
                        }
                    })
                    .map(new RichMapFunction&lt;String, WriteStatus&gt;() {
                        private HoodieFlinkWriteableTable hoodieTable;
                        private HoodieWriteConfig config;
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            super.open(parameters);
                            String basePath = "/tmp/flinkSinkToHudiDemo";
                            List&lt;String&gt; partitions = FSUtils.getAllPartitionPaths(hadoopConf, new Path(basePath + "/*/*/*/*"));
                            config = HoodieWriteConfig.newBuilder()
                                    .withAvroSchemaValidate(false)
                                    .withPath(basePath)
                                    .withSchema(DataSourceUtils.getSchemaFromJsonString("{\"name\":\"id\", \"type\":\"string\"}," +
                                            "{\"name\":\"time\", \"type\":\"timestamp\"}"))
                                    .withParallelism(1)
                                    .forTable("test")
                                    .withPreCombineField("time")
                                    .withBulkInsertParallelism(1)
                                    .withFinalizeWriteParallelism(1)
                                    .withWriteStatusClass(WriteStatus.class)
                                    .withCompactionConfig(HoodieWriteConfig.CompactionConfig.newBuilder()
                                            .withAutoClean(false)
                                            .withInlineCompaction(false)
                                            .withMaxNumDeltaCommitsBeforeCompaction(10)
                                            .withMinNumRecordsInFile(100)
                                            .build())
                                    .withDataSourceWriteOptions(
                                            DataSourceWriteOptions.&lt;String&gt;builder().withBootstrapIndexClass("org.apache.hudi.bootstrap.ZkBootstrapIndex")
                                                    .withBootstrapKeyGeneratorClass("org.apache.hudi.keygen.NonpartitionedKeyGenerator")
                                                    .withInsertShuffleParallelism(1)
                                                    .withUseInsertOverwrite(false)
                                                    .withUseUpsert(true)
                                                    .withViewStorageConfig(HoodieWriteConfig.ViewStorageConfig.newBuilder().withEnable(true)
                                                            .withHFileFormatConfig(HoodieWriteConfig.HFileFormatConfig.newBuilder()
                                                                    .withFileExtension("hfile").build())
                                                            .withProps(HoodieFlinkStreamer.Config.HOODIE_EXTRACTOR_REGEX_PATTERN_PROP.key(),
                                                                    "timestamp=com.*,id=com.*").build()).build())
                                    .withEmbeddedTimelineServerEnabled(true)
                                    .withAutoCommit(false)
                                    .withWriteStatusFailureFraction(0.0)
                                    .build();
                            hoodieTable = HoodieFlinkWriteableTable.of(config);
                        }
    
                        @Override
                        public WriteStatus map(String s) throws Exception {
                            String[] parts = s.split(",");
                            Timestamp ts = Timestamp.valueOf(parts[1]);
                            return hoodieTable.insert(parts[0] + parts[1], DataSourceUtils.createHoodieRecord(parts[0] + parts[1], parts[0], ts.getTime()));
                        }
                    })
                    .print();
            env.execute();
        }
    }
    

    这个demo包括了Flink作为数据流处理引擎,从Kafka消费数据,然后sink到Hudi。在代码中,首先设置了StreamExecutionEnvironment,并指定了如下的配置:

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(5000);
    

    这里使用了EventTime作为时间特性,然后启用了checkpoint,每5000ms进行一次checkpoint。然后创建了一个Kafka Consumer,并添加到stream处理数据,flatMap函数将数据扁平化,然后map函数将数据插入到Hudi中。

    在map函数中,首先指定了Hudi的base path,然后创建了HoodieWriteConfig对象,主要包括了数据源配置、数据写入配置、Hudi表的配置等等。之后创建了一个HoodieFlinkWriteableTable对象,将数据写入到Hudi中。最后,使用print()方法将结果打印出来。

    需要注意的是,此demo中使用的是插入数据的方式写入Hudi,可以根据实际情况选择不同的写入方式。

    2023-04-28 20:32:54
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    我找到了一个 Apache Hudi 官方提供的 Flink Sink 的 demo 示例,可以帮助您了解如何在 Flink 中使用 Hudi 进行数据写入和处理。以下是该示例的来源链接:

    https://github.com/apache/hudi/tree/master/hudi-flink/src/main/java/org/apache/hudi/flink/demo

    在这个示例中,Apache Hudi 提供了一个 Flink Sink 的实现,可以将输入流的数据写入到 Hudi 数据库中。该示例使用 Avro 格式的数据进行演示,并且提供了一些基本的配置选项,如数据表名称、数据写入模式、事件模式等。同时,该示例还提供了一些自定义的函数和操作符,如 Avro 序列化器、减少重复数据的操作符等,可以帮助您更好地理解和使用 Hudi 和 Flink。

    2023-04-27 09:16:11
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    是的,这里提供一个简单的示例代码来演示使用Flink将数据写入Hudi中。

    首先,需要使用以下依赖项:

    org.apache.flink flink-hudi 1.14.0 org.apache.hudi hudi-core 0.11.0-incubating 在代码中,可以按照以下步骤来使用Hudi Sink:

    创建Hudi Sink配置 Configuration hudiConf = FlinkOptionsWriter .buildHudiConfig("/path/to/hudi/dataset", "table-name", "hdfs://hadoop-master:8020", "parquet"); 其中,"/path/to/hudi/dataset"是指Hudi表所在的HDFS路径,"table-name"是表名,"hdfs://hadoop-master:8020"是指向HDFS的URI,"parquet"是存储格式。

    创建Flink流式处理作业 在这里只提供一个示例,读取Kafka中的JSON数据,并将其写入Hudi表中。

    // Flink流式处理程序 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 创建Kafka消费者 Properties consumerProperties = new Properties(); consumerProperties.setProperty("bootstrap.servers", "localhost:9092"); consumerProperties.setProperty("group.id", "test-group");

    FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), consumerProperties); DataStream stream = env.addSource(kafkaConsumer);

    // 数据变换 DataStream hoodieRecordStream = stream.map(new MapFunction<String, HoodieRecord>() { @Override public HoodieRecord map(String value) throws Exception { JSONObject obj = new JSONObject(value);

        String uuid = obj.getString("uuid");
        String data = obj.getString("data");
    
        HashMap<String, String> map = new HashMap<String, String>();
        map.put("uuid", uuid);
        map.put("data", data);
    
        return new HoodieRecord(new HoodieKey(uuid), map);
    }
    

    });

    // 写入Hudi表 hudiRecordStream.sink(new HoodieFlinkStreamer(hudiConf));

    env.execute("Write data to Hudi"); 在这段代码中,首先从Kafka消费数据,然后将其映射为Hudi记录对象,最后使用HoodieFlinkStreamer将数据写入Hudi表中。

    注意:以上示例仅用于演示目的,实际使用时需要根据具体情况进行修改。

    2023-04-26 17:20:42
    赞同 展开评论 打赏
  • 是的,Apache Hudi已经提供了Flink Sink的实现,您可以通过以下步骤来使用:

    首先,需要在POM文件中添加Hudi和Flink的依赖,例如: org.apache.hudi hudi-flink ${hudi.version}

    org.apache.flink flink-connector-filesystem_2.11 ${flink.version} 其中,${hudi.version}和${flink.version}需要替换为相应的版本号。

    在Flink应用程序中,创建一个WriteHoodieTableFunction对象,用于将数据写入Hudi表中。例如: WriteHoodieTableFunction.Builder builder = WriteHoodieTableFunction.newBuilder() .withWriteConfig(HoodieWriteConfig.newBuilder() .withPath("/path/to/hudi/table") .withSchema(SCHEMA) .withParallelism(2, 2) .forTable("my_hudi_table") .build()) .withOperation(Operation.INSERT) .withRecordKeyField(FIELD_ID) .withPartitionPathField(FIELD_PARTITION);

        DataStreamSink<Tuple2<Boolean, Row>> sink = input
                .addSink(builder.build());
    

    在上述代码中,我们指定了数据写入的路径、模式、并行度等参数,并且设置了操作类型、记录键字段和分区路径字段等信息。然后将数据流(input)传递给addSink方法,将其写入Hudi表中。

    需要注意的是,在使用Flink Sink写入Hudi表时,需要保证数据的基本写入一致性。具体而言,可以使用WriteMode来控制写入模式,例如:

    builder.withWriteConfig(HoodieWriteConfig.newBuilder() .withPath("/path/to/hudi/table") .withSchema(SCHEMA) .withParallelism(2, 2) .forTable("my_hudi_table") .withWriteConcurrency(2) .withWriteBufferLimitBytes(1024 * 1024 * 1024) .withIndexConfig( HoodieIndexConfig.newBuilder() .withIndexType(HoodieIndex.IndexType.BLOOM) .build()) .withAutoCommit(false) .withWriteStatusClass(MetadataMergeWriteResult.class) .withWriteBootstrapIndex(true) .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(SCHEMA.getClass()) .build()) .withEmbeddedTimelineServerEnabled(true) .withBulkInsertSortMode(BulkInsertSortMode.GLOBAL_SORT) .withProps(props) .writeConcurrency(2) .insertSplitSize(1) // 设置初始写入批次大小 .bulkInsertSortMode(BulkInsertSortMode.GLOBAL_SORT) .withWriteMode(WriteMode.UPSERT) .build()); 在上述代码中,我们将写入模式设置为UPSERT,表示如果记录键相同,则更新现有记录。此外,还设置了初始写入批次大小、并行度、索引类型等参数。

    总的来说,使用Flink Sink写入Hudi表可以很方便地实现数据的同步和管理。需要注意的是,在使用过程中,需要根据具体的业务场景和数据特点进行相应的配置和优化,以提高数据的处理效率和准确性。

    2023-04-26 10:50:46
    赞同 展开评论 打赏
  • 以下是一个将Flink数据写入Hudi的示例代码:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hudi.client.HoodieWriteClient;
    import org.apache.hudi.client.WriteStatus;
    import org.apache.hudi.common.model.HoodieRecord;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.common.util.FSUtils;
    import org.apache.hudi.keygen.SimpleKeyGenerator;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    public class FlinkToHudi {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("group.id", "flink");
            DataStream<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
            DataStream<HoodieRecord> hoodieStream = kafkaStream.map(new MapFunction<String, HoodieRecord>() {
                @Override
                public HoodieRecord map(String s) throws Exception {
                    // 在这里构造HoodieRecord对象
                    HoodieRecord record = new HoodieRecord<>(key, value);
                    return record;
                }
            });
            String basePath = "hdfs://localhost:9000/user/hudi/test";
            Configuration hadoopConf = new Configuration();
            org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf);
            HoodieWriteClient writeClient = new HoodieWriteClient(fs, basePath);
            List<WriteStatus> writeStatuses = new ArrayList<>();
            hoodieStream.foreachBatch((records, aLong) -> {
                if (!records.isEmpty()) {
                    writeStatuses.addAll(writeClient.upsert(new org.apache.hudi.common.model.HoodieJavaRDD<>(records.rdd()), aLong.toString()));
                }
            });
            env.execute("Flink to Hudi");
        }
    }
    

    在上面的代码中,我们首先使用FlinkKafkaConsumer从Kafka中读取数据,然后将数据转换为HoodieRecord对象。接着,我们使用HoodieWriteClient对象将HoodieRecord写入Hudi表中。这里需要注意的是,我们使用foreachBatch方法将每个批次的数据写入Hudi表中,以提高写入的效率。

    在使用该示例代码时,需要先将Hudi和相关依赖库添加到项目中,并根据实际情况修改代码中的配置参数。

    2023-04-25 09:03:46
    赞同 展开评论 打赏
  • 十分耕耘,一定会有一分收获!

    楼主你好,看看这个: -- 创建hudi表 set execution.result-mode=tableau; CREATE TABLE hudi_users5(id BIGINT PRIMARY KEY NOT ENFORCED,name STRING,birthday TIMESTAMP(3),ts TIMESTAMP(3),partition VARCHAR(20) ) PARTITIONED BY (partition) WITH ('connector' = 'hudi','table.type' = 'MERGE_ON_READ','path' = 'hdfs://nameservice1/tmp/hudi' ); -- 插入测试数据 INSERT INTO hudi_users5 VALUES(1,'Danny',TIMESTAMP '1992-01-01 00:00:01',TIMESTAMP '2021-01-01 00:00:01','par1'),(2,'Stephen',TIMESTAMP '1991-08-02 00:00:01',TIMESTAMP '1970-01-01 00:00:02','par1'),(3,'Julian',TIMESTAMP '1995-10-01 00:00:01',TIMESTAMP '1970-01-01 00:00:03','par2'),(4,'Fabian',TIMESTAMP '1995-08-05 00:00:01',TIMESTAMP '1970-01-01 00:00:04','par2'),(5,'Sophia',TIMESTAMP '1996-07-08 00:00:01',TIMESTAMP '1970-01-01 00:00:05','par3'),(6,'Emma',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:06','par3'),(7,'Bob',TIMESTAMP '1956-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:07','par4'),(8,'Han',TIMESTAMP '1988-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:08','par4');

    2023-04-24 22:46:26
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    以下是使用 Flink 将数据 Sink 到 Hudi 的示例代码:

    Copy code

    // Hudi 配置
    Configuration hudiConfig = HoodieWriteConfig.newBuilder()
            .withPath("/path/to/hudi/table")
            .withSchema(HoodieAvroUtils.createHoodieWriteSchema(dataSchema))
            .withTableName("test")
            .withBulkInsertParallelism(3)
            .withPayloadClassName("org.apache.hudi.avro.model.DefaultHoodieRecordPayload")
            .forTable("test")
            .withIndexConfig(HoodieIndexConfig.newBuilder()
                    .withIndexType(HoodieIndex.IndexType.BLOOM)
                    .build())
            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                    .withPayloadClass("org.apache.hudi.avro.model.HoodieAvroPayload")
                    .build())
            .withAutoCommit(false)
            .withProps(props)
            .build();
    
    // 创建 Flink 流式计算环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // 创建 Flink 数据流
    DataStream<Row> dataStream = env
            .addSource(new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), properties))
            .returns(Types.STRING)
            .flatMap((String line, Collector<Row> out) -> {
                // 解析输入数据并转换成 Row 类型
                Row row = new Row(3);
                row.setField(0, ...);
                row.setField(1, ...);
                row.setField(2, ...);
                out.collect(row);
            })
            .returns(Types.ROW);
    
    // 将数据 Sink 到 Hudi
    dataStream.addSink(new HoodieSinkFunction<>(hudiConfig, new UpsertHandler()))
    
    

    需要注意的是,这只是一个简单的示例代码,具体的实现需要根据实际场景进行调整。其中 HoodieSinkFunction 是 Flink 的自定义 Sink 函数,用于将数据写入到 Hudi 表中。UpsertHandler 则是处理数据的函数,根据实际情况进行修改。

    2023-04-24 08:02:20
    赞同 展开评论 打赏
  • 热爱开发

    以下是使用 Flink 将数据写入 Apache Hudi 的一个简单的示例。假设需要将批量的 JSON 数据写入 Hudi 中,可以按照以下步骤进行:

    引入必要的依赖: org.apache.flink flink-hudi ${flink.version} 创建 Flink 批处理作业,并读取 JSON 数据: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    // 读取 JSON 数据 DataSource dataSource = env.readTextFile("path/to/json/data"); 使用 JsonNodeDeserializationSchema 将 JSON 数据转换为 JsonNode 对象,并使用 HoodieFlinkWriteableTable 定义 Hudi 表的基本信息: // 将 JSON 数据转换为 JsonNode 对象 DataStream jsonDataStream = dataSource.map(new MapFunction<String, JsonNode>() { @Override public JsonNode map(String value) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(value, JsonNode.class); } });

    // 定义 Hoodie 表的基本信息 Configuration conf = new Configuration(); conf.set(HoodieWriteConfig.TABLE_NAME, "tableName"); conf.set(HoodieWriteConfig.RECORD_KEY_FIELD, "_row_key"); conf.set(HoodieWriteConfig.PARTITION_PATH_FIELD, "date");

    HoodieFlinkWriteableTable hoodieTable = HoodieFlinkWriteableTable.newBuilder() .withTableName("tableName") .withRecordKeyField("_row_key") .withPartitionPathField("date") .withPreCombineField(null) .withOperation(UPSERT) .withWriteConfig(HoodieWriteConfig.newBuilder().withConfiguration(conf).build()) .build(); 将数据写入 Hudi 中: // 定义 Flink 的 SinkFunction HoodieFlinkSink hoodieSink = HoodieFlinkSink.newWriteStreamer(hoodieTable, env);

    // 写入数据到 Hudi jsonDataStream.addSink(hoodieSink); 完整代码示例如下:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    // 读取 JSON 数据 DataSource dataSource = env.readTextFile("path/to/json/data");

    // 将 JSON 数据转换为 JsonNode 对象 DataStream jsonDataStream = dataSource.map(new MapFunction<String, JsonNode>() { @Override public JsonNode map(String value) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(value, JsonNode.class); } });

    // 定义 Hoodie 表的基本信息 Configuration conf = new Configuration(); conf.set(HoodieWriteConfig.TABLE_NAME, "tableName"); conf.set(HoodieWriteConfig.RECORD_KEY_FIELD, "_row_key"); conf.set(HoodieWriteConfig.PARTITION_PATH_FIELD, "date");

    HoodieFlinkWriteableTable hoodieTable = HoodieFlinkWriteableTable.newBuilder() .withTableName("tableName") .withRecordKeyField("_row_key") .withPartitionPathField("date") .withPreCombineField(null) .withOperation(UPSERT) .withWriteConfig(HoodieWriteConfig.newBuilder().withConfiguration(conf).build()) .build();

    // 定义 Flink 的 SinkFunction HoodieFlinkSink hoodieSink = HoodieFlinkSink.newWriteStreamer(hoodieTable, env);

    // 写入数据到 Hudi jsonDataStream.addSink(hoodieSink);

    env.execute("Flink Hudi Example"); 需要注意的是,上述代码仅为演示示例,实际应用中需要根据具体情况进行调整和优化。

    2023-04-23 17:56:48
    赞同 展开评论 打赏
  • 存在即是合理

    示例:

    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.hudi.HoodieFlinkWriteConfig;
    import org.apache.flink.streaming.connectors.hudi.HoodieSink;
    import org.apache.flink.streaming.connectors.hudi.HoodieWriteHandleFactory;
    import org.apache.flink.streaming.connectors.hudi.HoodieWriteable;
    import org.apache.flink.streaming.connectors.hudi.HoodieWriteableTable;
    import org.apache.hudi.DataSourceWriteOptions;
    import org.apache.hudi.client.HoodieWriteClient;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.configuration.FlinkOptions;
    import org.apache.hudi.sink.StreamWriteOperatorFactory;
    
    import java.util.Properties;
    
    public class FlinkHudiDemo {
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // 构造 HoodieFlinkWriteConfig
        Properties props = new Properties();
        props.setProperty(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name());
        props.setProperty(FlinkOptions.PATH.key(), "/path/to/hudi/table");
        props.setProperty(FlinkOptions.TABLE_NAME.key(), "hudi_table_name");
        props.setProperty(FlinkOptions.PRECOMBINE_FIELD.key(), "timestamp");
        props.setProperty(FlinkOptions.RECORD_KEY_FIELD.key(), "id");
        props.setProperty(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition_path");
        HoodieFlinkWriteConfig writeConfig = HoodieFlinkWriteConfig.newBuilder()
            .withWritePayloadRecordKey("id")
            .withPath("/path/to/hudi/table")
            .withTableName("hudi_table_name")
            .withPreCombineField("timestamp")
            .withProps(props)
            .build();
    
        // 构造 HoodieSink
        HoodieWriteHandleFactory<HoodieWriteableRecord> handleFactory = new HoodieWriteHandleFactory<HoodieWriteableRecord>() {
          @Override
          public HoodieWriteableTable<HoodieWriteableRecord> create(
              HoodieWriteClient client, String instantTime) {
            return new HoodieWriteableTable<>(client, instantTime, HoodieWriteableRecord.class);
          }
        };
        HoodieWriteable<HoodieWriteableRecord> hoodieWriteable = new HoodieWriteable<>(handleFactory);
        HoodieSink<HoodieWriteableRecord> hoodieSink = new HoodieSink<>(writeConfig, hoodieWriteable,
            new HoodieSinkFunction.HoodieSinkFunctionFactory<>(), new StreamWriteOperatorFactory<>());
    
        // 数据源
        env.addSource(new MyDataSource())
            .map(new MyMapper())
            .addSink(hoodieSink);
    
        env.execute("Flink Hudi Demo");
      }
    }
    
    
    

    通过HoodieFlinkWriteConfig来配置写入Hudi的相关参数,其中包括表路径、表名称、表类型、记录主键、分区字段等。通过HoodieSink将数据写入Hudi。

    2023-04-23 16:51:25
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载