以下是一个简单的 Flink Sink 到 Hudi 的 Demo 示例:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.streamer.FlinkStreamer;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.streamer.Operation;
import org.apache.flink.table.api.EnvironmentSettings;
public class FlinkHudiDemo {
public static void main(String[] args) throws Exception {
// 定义 Flink Execution Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义 StreamTableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 定义测试数据
String[] input = new String[]{"id-1,John,26,Male", "id-2,Doe,30,Male"};
// 构造测试数据,转换为 Table 类型
Table inputTable = env.fromElements(input)
.map((MapFunction<String, Row>) value -> {
String[] values = value.split(",");
return Row.of(values[0], values[1], Integer.parseInt(values[2]), values[3]);
})
.toTable(tableEnv, DataTypes.ROW(DataTypes.FIELD("id", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("gender", DataTypes.STRING())));
// 定义 Hudi 配置参数
BatchWriteConfig hoodieCfg = HoodieWriteConfig.newBuilder().withPath("hdfs://your_hdfs_path")
.withSchema(HoodieAvroSchemaConverter.convertToStructType(schema)).build();
// 定义 Flink save point 路径
String savepointPath = "hdfs://your_savepoint_path";
// 定义 Flink Streamer 配置参数
FlinkStreamerConfig cfg = FlinkStreamerConfig.builder()
.withOperation(Operation.UPSERT)
.withTable("test_table")
.withTargetBasePath("hdfs://your_target_path")
.withWriteDefaults(false)
.withSchema(KafkaJsonSerDeUtils.generateSchema(objectMapper, schema).toString())
.withKeyGeneratorClass("org.apache.hudi.keygen.SimpleKeyGenerator")
.withHoodieConfigs(hoodieCfg.getProps())
.withCheckpointInterval(1000 * 10) // 设置checkpoint 间隔时间
.withPrecombineField("id")
.withRowKey("id")
.withMetricsPrefix("test")
.withTransformerClass("org.apache.hudi.utilities.transform.SqlQueryBasedTransformer")
.withTransformConfigPath("path/to/configs")
.withBootstrapBasePath("hdfs://your_bootstrap_path")
.withFilterDupes(false)
.withSavePointPath(savepointPath)
.build();
// 将数据写入到 Hudi 表中
FlinkStreamer streamer = new FlinkStreamer(cfg, env);
streamer.sync();
// 执行 flink 程序
env.execute("FlinkHudiDemo");
}
}
是的,Flink 提供了将数据流推送到 Hudi 的功能,并且提供了对应的 sink。以下是一个简单的 Flink Sink 到 Hudi 的示例,供您参考:
DataStream<Tuple2<Boolean, Row>> dataStream = ...; // 从某个数据源获取一个流
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 创建一个流表环境
Table table = tableEnv.fromDataStream(dataStream, "id,name,age"); // 将数据流转换为 Table
tableEnv.createTemporaryView("temp_table", table);
HoodieConfig hoodieConfig = HoodieWriterConfig.newBuilder()
.withPath("path/to/hudi/table")
.withPreCombineField("timestamp")
.withSchema(TABLE_SCHEMA)
.withParallelism(1)
.forTable(TABLE_NAME)
.build();
HoodieFlinkTableSink sink = new HoodieFlinkTableSink(hoodieConfig, LIMITED_CONCURRENT_REQUESTS, 3);
tableEnv.registerTableSink("hudi_sink", new String[]{"id","name","age"}, new TypeInformation[]{Types.STRING, Types.STRING, Types.INT}, sink);
tableEnv.sqlUpdate("insert into hudi_sink select id, name, age from temp_table"); // 执行插入操作
在这个示例中,我们首先使用 StreamTableEnvironment
创建一个流表环境,将 DataStream
转换为 Table
并注册为临时表。接下来,我们创建了一个 HoodieConfig
对象并使用它实例化了一个 HoodieFlinkTableSink
对象,最后将其注册为表格 “hudi_sink”。我们还通过将 temp_table
插入 hudi_sink
表格来执行 Hudi Sink 操作。
需要注意的是,示例中的代码片段只是一个简单的示例,并不是完整的可运行程序。您需要根据实际情况进行适当的修改,并根据您选择的 Hudi 版本添加相应的依赖关系。
下面是一个简单的示例,在Flink中使用Hudi Sink将数据写入到Hudi的表中。 首先,需要引入相关的依赖:
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.1</version>
</dependency>
接下来,可以使用以下示例代码:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.hudi.flink.HoodieFlinkWriteConfig;
import org.apache.hudi.flink.HoodieWriteConfig;
import org.apache.hudi.flink.HoodieWriteHandle;
import org.apache.hudi.flink.HoodieFlinkTableSink;
import org.apache.hudi.flink.HoodieFlinkTableFactory;
import org.apache.hudi.hadoop.config.HoodieWriteConfig;
import org.apache.hadoop.conf.Configuration;
import java.util.Properties;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import java.io.IOException;
public class FlinkHudiSinkDemo {
public static void main(String[] args) throws Exception {
// 创建Flink执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 定义输入流
DataStream<String> input = env.addSource(new MySourceFunction());
// 定义Hudi Sink
HoodieFlinkWriteConfig writeConfig = makeConfig();
HoodieFlinkTableSink sink = (HoodieFlinkTableSink) HoodieFlinkTableFactory.create(
// 设置生成Hudi表的路径和名称
"./path/to/hudi/table",
// 设置表类型为COPY_ON_WRITE或MERGE_ON_READ
HoodieTableType.COPY_ON_WRITE,
// 设置write配置
writeConfig);
input.addSink(sink);
// 执行任务
env.execute("Flink Hudi Sink Demo");
}
private static HoodieFlinkWriteConfig makeConfig() {
Properties props = new Properties();
// 设置Hadoop配置(如DFS配置)
Configuration hadoopConf = new Configuration();
props.put(FlinkOptions.HADOOP_CONF, hadoopConf);
// 设置表类型
props.put(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
// 设置写入模式
props.put(HoodieWriteConfig.WRITE_OPERATION_OPT_KEY(), HoodieWriteConfig.INSERT_OPERATION);
// 设置写入批次大小
props.put(HoodieWriteConfig.BULK_INSERT_BATCH_SIZE_OPT_KEY(), "1000");
// 设置日期时间格式
props.put(FlinkOptions.WRITE_DATE_FORMAT, "yyyyMMdd");
// 设置表名称
props.put(FlinkOptions.TABLE_NAME, "hudi_test");
// 构造Hoodie Flink写入配置对象
HoodieFlinkWriteConfig writeConfig = HoodieFlinkWriteConfig.newBuilder()
.withSchema("")
.withProps(props)
.withParallelism(1)
.withBulkInsertParallelism(1)
.build();
return writeConfig;
}
private static final class MySourceFunction implements SourceFunction<String> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 读取数据源
while (isRunning) {
String message = null; // 读取数据
if (message != null) {
ctx.collect(message);
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}
}
以下是一个Flink Sink到Hudi的demo:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieFlinkWriteableTable;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.config.HoodieBootstrapConfig;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.utilities.HoodieFlinkStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import java.sql.Timestamp;
import java.util.List;
import java.util.Properties;
public class FlinkSinkToHudiDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props);
env.addSource(consumer)
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
collector.collect(s);
}
})
.map(new RichMapFunction<String, WriteStatus>() {
private HoodieFlinkWriteableTable hoodieTable;
private HoodieWriteConfig config;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String basePath = "/tmp/flinkSinkToHudiDemo";
List<String> partitions = FSUtils.getAllPartitionPaths(hadoopConf, new Path(basePath + "/*/*/*/*"));
config = HoodieWriteConfig.newBuilder()
.withAvroSchemaValidate(false)
.withPath(basePath)
.withSchema(DataSourceUtils.getSchemaFromJsonString("{\"name\":\"id\", \"type\":\"string\"}," +
"{\"name\":\"time\", \"type\":\"timestamp\"}"))
.withParallelism(1)
.forTable("test")
.withPreCombineField("time")
.withBulkInsertParallelism(1)
.withFinalizeWriteParallelism(1)
.withWriteStatusClass(WriteStatus.class)
.withCompactionConfig(HoodieWriteConfig.CompactionConfig.newBuilder()
.withAutoClean(false)
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(10)
.withMinNumRecordsInFile(100)
.build())
.withDataSourceWriteOptions(
DataSourceWriteOptions.<String>builder().withBootstrapIndexClass("org.apache.hudi.bootstrap.ZkBootstrapIndex")
.withBootstrapKeyGeneratorClass("org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.withInsertShuffleParallelism(1)
.withUseInsertOverwrite(false)
.withUseUpsert(true)
.withViewStorageConfig(HoodieWriteConfig.ViewStorageConfig.newBuilder().withEnable(true)
.withHFileFormatConfig(HoodieWriteConfig.HFileFormatConfig.newBuilder()
.withFileExtension("hfile").build())
.withProps(HoodieFlinkStreamer.Config.HOODIE_EXTRACTOR_REGEX_PATTERN_PROP.key(),
"timestamp=com.*,id=com.*").build()).build())
.withEmbeddedTimelineServerEnabled(true)
.withAutoCommit(false)
.withWriteStatusFailureFraction(0.0)
.build();
hoodieTable = HoodieFlinkWriteableTable.of(config);
}
@Override
public WriteStatus map(String s) throws Exception {
String[] parts = s.split(",");
Timestamp ts = Timestamp.valueOf(parts[1]);
return hoodieTable.insert(parts[0] + parts[1], DataSourceUtils.createHoodieRecord(parts[0] + parts[1], parts[0], ts.getTime()));
}
})
.print();
env.execute();
}
}
这个demo包括了Flink作为数据流处理引擎,从Kafka消费数据,然后sink到Hudi。在代码中,首先设置了StreamExecutionEnvironment,并指定了如下的配置:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);
这里使用了EventTime作为时间特性,然后启用了checkpoint,每5000ms进行一次checkpoint。然后创建了一个Kafka Consumer,并添加到stream处理数据,flatMap函数将数据扁平化,然后map函数将数据插入到Hudi中。
在map函数中,首先指定了Hudi的base path,然后创建了HoodieWriteConfig对象,主要包括了数据源配置、数据写入配置、Hudi表的配置等等。之后创建了一个HoodieFlinkWriteableTable对象,将数据写入到Hudi中。最后,使用print()方法将结果打印出来。
需要注意的是,此demo中使用的是插入数据的方式写入Hudi,可以根据实际情况选择不同的写入方式。
我找到了一个 Apache Hudi 官方提供的 Flink Sink 的 demo 示例,可以帮助您了解如何在 Flink 中使用 Hudi 进行数据写入和处理。以下是该示例的来源链接:
https://github.com/apache/hudi/tree/master/hudi-flink/src/main/java/org/apache/hudi/flink/demo
在这个示例中,Apache Hudi 提供了一个 Flink Sink 的实现,可以将输入流的数据写入到 Hudi 数据库中。该示例使用 Avro 格式的数据进行演示,并且提供了一些基本的配置选项,如数据表名称、数据写入模式、事件模式等。同时,该示例还提供了一些自定义的函数和操作符,如 Avro 序列化器、减少重复数据的操作符等,可以帮助您更好地理解和使用 Hudi 和 Flink。
是的,这里提供一个简单的示例代码来演示使用Flink将数据写入Hudi中。
首先,需要使用以下依赖项:
org.apache.flink flink-hudi 1.14.0 org.apache.hudi hudi-core 0.11.0-incubating 在代码中,可以按照以下步骤来使用Hudi Sink:
创建Hudi Sink配置 Configuration hudiConf = FlinkOptionsWriter .buildHudiConfig("/path/to/hudi/dataset", "table-name", "hdfs://hadoop-master:8020", "parquet"); 其中,"/path/to/hudi/dataset"是指Hudi表所在的HDFS路径,"table-name"是表名,"hdfs://hadoop-master:8020"是指向HDFS的URI,"parquet"是存储格式。
创建Flink流式处理作业 在这里只提供一个示例,读取Kafka中的JSON数据,并将其写入Hudi表中。
// Flink流式处理程序 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建Kafka消费者 Properties consumerProperties = new Properties(); consumerProperties.setProperty("bootstrap.servers", "localhost:9092"); consumerProperties.setProperty("group.id", "test-group");
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), consumerProperties); DataStream stream = env.addSource(kafkaConsumer);
// 数据变换 DataStream hoodieRecordStream = stream.map(new MapFunction<String, HoodieRecord>() { @Override public HoodieRecord map(String value) throws Exception { JSONObject obj = new JSONObject(value);
String uuid = obj.getString("uuid");
String data = obj.getString("data");
HashMap<String, String> map = new HashMap<String, String>();
map.put("uuid", uuid);
map.put("data", data);
return new HoodieRecord(new HoodieKey(uuid), map);
}
});
// 写入Hudi表 hudiRecordStream.sink(new HoodieFlinkStreamer(hudiConf));
env.execute("Write data to Hudi"); 在这段代码中,首先从Kafka消费数据,然后将其映射为Hudi记录对象,最后使用HoodieFlinkStreamer将数据写入Hudi表中。
注意:以上示例仅用于演示目的,实际使用时需要根据具体情况进行修改。
是的,Apache Hudi已经提供了Flink Sink的实现,您可以通过以下步骤来使用:
首先,需要在POM文件中添加Hudi和Flink的依赖,例如: org.apache.hudi hudi-flink ${hudi.version}
org.apache.flink flink-connector-filesystem_2.11 ${flink.version} 其中,${hudi.version}和${flink.version}需要替换为相应的版本号。
在Flink应用程序中,创建一个WriteHoodieTableFunction对象,用于将数据写入Hudi表中。例如: WriteHoodieTableFunction.Builder builder = WriteHoodieTableFunction.newBuilder() .withWriteConfig(HoodieWriteConfig.newBuilder() .withPath("/path/to/hudi/table") .withSchema(SCHEMA) .withParallelism(2, 2) .forTable("my_hudi_table") .build()) .withOperation(Operation.INSERT) .withRecordKeyField(FIELD_ID) .withPartitionPathField(FIELD_PARTITION);
DataStreamSink<Tuple2<Boolean, Row>> sink = input
.addSink(builder.build());
在上述代码中,我们指定了数据写入的路径、模式、并行度等参数,并且设置了操作类型、记录键字段和分区路径字段等信息。然后将数据流(input)传递给addSink方法,将其写入Hudi表中。
需要注意的是,在使用Flink Sink写入Hudi表时,需要保证数据的基本写入一致性。具体而言,可以使用WriteMode来控制写入模式,例如:
builder.withWriteConfig(HoodieWriteConfig.newBuilder() .withPath("/path/to/hudi/table") .withSchema(SCHEMA) .withParallelism(2, 2) .forTable("my_hudi_table") .withWriteConcurrency(2) .withWriteBufferLimitBytes(1024 * 1024 * 1024) .withIndexConfig( HoodieIndexConfig.newBuilder() .withIndexType(HoodieIndex.IndexType.BLOOM) .build()) .withAutoCommit(false) .withWriteStatusClass(MetadataMergeWriteResult.class) .withWriteBootstrapIndex(true) .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(SCHEMA.getClass()) .build()) .withEmbeddedTimelineServerEnabled(true) .withBulkInsertSortMode(BulkInsertSortMode.GLOBAL_SORT) .withProps(props) .writeConcurrency(2) .insertSplitSize(1) // 设置初始写入批次大小 .bulkInsertSortMode(BulkInsertSortMode.GLOBAL_SORT) .withWriteMode(WriteMode.UPSERT) .build()); 在上述代码中,我们将写入模式设置为UPSERT,表示如果记录键相同,则更新现有记录。此外,还设置了初始写入批次大小、并行度、索引类型等参数。
总的来说,使用Flink Sink写入Hudi表可以很方便地实现数据的同步和管理。需要注意的是,在使用过程中,需要根据具体的业务场景和数据特点进行相应的配置和优化,以提高数据的处理效率和准确性。
以下是一个将Flink数据写入Hudi的示例代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class FlinkToHudi {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink");
DataStream<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
DataStream<HoodieRecord> hoodieStream = kafkaStream.map(new MapFunction<String, HoodieRecord>() {
@Override
public HoodieRecord map(String s) throws Exception {
// 在这里构造HoodieRecord对象
HoodieRecord record = new HoodieRecord<>(key, value);
return record;
}
});
String basePath = "hdfs://localhost:9000/user/hudi/test";
Configuration hadoopConf = new Configuration();
org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf);
HoodieWriteClient writeClient = new HoodieWriteClient(fs, basePath);
List<WriteStatus> writeStatuses = new ArrayList<>();
hoodieStream.foreachBatch((records, aLong) -> {
if (!records.isEmpty()) {
writeStatuses.addAll(writeClient.upsert(new org.apache.hudi.common.model.HoodieJavaRDD<>(records.rdd()), aLong.toString()));
}
});
env.execute("Flink to Hudi");
}
}
在上面的代码中,我们首先使用FlinkKafkaConsumer从Kafka中读取数据,然后将数据转换为HoodieRecord对象。接着,我们使用HoodieWriteClient对象将HoodieRecord写入Hudi表中。这里需要注意的是,我们使用foreachBatch方法将每个批次的数据写入Hudi表中,以提高写入的效率。
在使用该示例代码时,需要先将Hudi和相关依赖库添加到项目中,并根据实际情况修改代码中的配置参数。
楼主你好,看看这个: -- 创建hudi表 set execution.result-mode=tableau; CREATE TABLE hudi_users5(id BIGINT PRIMARY KEY NOT ENFORCED,name STRING,birthday TIMESTAMP(3),ts TIMESTAMP(3),partition
VARCHAR(20) ) PARTITIONED BY (partition
) WITH ('connector' = 'hudi','table.type' = 'MERGE_ON_READ','path' = 'hdfs://nameservice1/tmp/hudi' ); -- 插入测试数据 INSERT INTO hudi_users5 VALUES(1,'Danny',TIMESTAMP '1992-01-01 00:00:01',TIMESTAMP '2021-01-01 00:00:01','par1'),(2,'Stephen',TIMESTAMP '1991-08-02 00:00:01',TIMESTAMP '1970-01-01 00:00:02','par1'),(3,'Julian',TIMESTAMP '1995-10-01 00:00:01',TIMESTAMP '1970-01-01 00:00:03','par2'),(4,'Fabian',TIMESTAMP '1995-08-05 00:00:01',TIMESTAMP '1970-01-01 00:00:04','par2'),(5,'Sophia',TIMESTAMP '1996-07-08 00:00:01',TIMESTAMP '1970-01-01 00:00:05','par3'),(6,'Emma',TIMESTAMP '1970-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:06','par3'),(7,'Bob',TIMESTAMP '1956-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:07','par4'),(8,'Han',TIMESTAMP '1988-01-01 00:00:01',TIMESTAMP '1970-01-01 00:00:08','par4');
以下是使用 Flink 将数据 Sink 到 Hudi 的示例代码:
Copy code
// Hudi 配置
Configuration hudiConfig = HoodieWriteConfig.newBuilder()
.withPath("/path/to/hudi/table")
.withSchema(HoodieAvroUtils.createHoodieWriteSchema(dataSchema))
.withTableName("test")
.withBulkInsertParallelism(3)
.withPayloadClassName("org.apache.hudi.avro.model.DefaultHoodieRecordPayload")
.forTable("test")
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.BLOOM)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass("org.apache.hudi.avro.model.HoodieAvroPayload")
.build())
.withAutoCommit(false)
.withProps(props)
.build();
// 创建 Flink 流式计算环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建 Flink 数据流
DataStream<Row> dataStream = env
.addSource(new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), properties))
.returns(Types.STRING)
.flatMap((String line, Collector<Row> out) -> {
// 解析输入数据并转换成 Row 类型
Row row = new Row(3);
row.setField(0, ...);
row.setField(1, ...);
row.setField(2, ...);
out.collect(row);
})
.returns(Types.ROW);
// 将数据 Sink 到 Hudi
dataStream.addSink(new HoodieSinkFunction<>(hudiConfig, new UpsertHandler()))
需要注意的是,这只是一个简单的示例代码,具体的实现需要根据实际场景进行调整。其中 HoodieSinkFunction 是 Flink 的自定义 Sink 函数,用于将数据写入到 Hudi 表中。UpsertHandler 则是处理数据的函数,根据实际情况进行修改。
以下是使用 Flink 将数据写入 Apache Hudi 的一个简单的示例。假设需要将批量的 JSON 数据写入 Hudi 中,可以按照以下步骤进行:
引入必要的依赖: org.apache.flink flink-hudi ${flink.version} 创建 Flink 批处理作业,并读取 JSON 数据: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 读取 JSON 数据 DataSource dataSource = env.readTextFile("path/to/json/data"); 使用 JsonNodeDeserializationSchema 将 JSON 数据转换为 JsonNode 对象,并使用 HoodieFlinkWriteableTable 定义 Hudi 表的基本信息: // 将 JSON 数据转换为 JsonNode 对象 DataStream jsonDataStream = dataSource.map(new MapFunction<String, JsonNode>() { @Override public JsonNode map(String value) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(value, JsonNode.class); } });
// 定义 Hoodie 表的基本信息 Configuration conf = new Configuration(); conf.set(HoodieWriteConfig.TABLE_NAME, "tableName"); conf.set(HoodieWriteConfig.RECORD_KEY_FIELD, "_row_key"); conf.set(HoodieWriteConfig.PARTITION_PATH_FIELD, "date");
HoodieFlinkWriteableTable hoodieTable = HoodieFlinkWriteableTable.newBuilder() .withTableName("tableName") .withRecordKeyField("_row_key") .withPartitionPathField("date") .withPreCombineField(null) .withOperation(UPSERT) .withWriteConfig(HoodieWriteConfig.newBuilder().withConfiguration(conf).build()) .build(); 将数据写入 Hudi 中: // 定义 Flink 的 SinkFunction HoodieFlinkSink hoodieSink = HoodieFlinkSink.newWriteStreamer(hoodieTable, env);
// 写入数据到 Hudi jsonDataStream.addSink(hoodieSink); 完整代码示例如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 读取 JSON 数据 DataSource dataSource = env.readTextFile("path/to/json/data");
// 将 JSON 数据转换为 JsonNode 对象 DataStream jsonDataStream = dataSource.map(new MapFunction<String, JsonNode>() { @Override public JsonNode map(String value) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(value, JsonNode.class); } });
// 定义 Hoodie 表的基本信息 Configuration conf = new Configuration(); conf.set(HoodieWriteConfig.TABLE_NAME, "tableName"); conf.set(HoodieWriteConfig.RECORD_KEY_FIELD, "_row_key"); conf.set(HoodieWriteConfig.PARTITION_PATH_FIELD, "date");
HoodieFlinkWriteableTable hoodieTable = HoodieFlinkWriteableTable.newBuilder() .withTableName("tableName") .withRecordKeyField("_row_key") .withPartitionPathField("date") .withPreCombineField(null) .withOperation(UPSERT) .withWriteConfig(HoodieWriteConfig.newBuilder().withConfiguration(conf).build()) .build();
// 定义 Flink 的 SinkFunction HoodieFlinkSink hoodieSink = HoodieFlinkSink.newWriteStreamer(hoodieTable, env);
// 写入数据到 Hudi jsonDataStream.addSink(hoodieSink);
env.execute("Flink Hudi Example"); 需要注意的是,上述代码仅为演示示例,实际应用中需要根据具体情况进行调整和优化。
示例:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.hudi.HoodieFlinkWriteConfig;
import org.apache.flink.streaming.connectors.hudi.HoodieSink;
import org.apache.flink.streaming.connectors.hudi.HoodieWriteHandleFactory;
import org.apache.flink.streaming.connectors.hudi.HoodieWriteable;
import org.apache.flink.streaming.connectors.hudi.HoodieWriteableTable;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import java.util.Properties;
public class FlinkHudiDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 构造 HoodieFlinkWriteConfig
Properties props = new Properties();
props.setProperty(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name());
props.setProperty(FlinkOptions.PATH.key(), "/path/to/hudi/table");
props.setProperty(FlinkOptions.TABLE_NAME.key(), "hudi_table_name");
props.setProperty(FlinkOptions.PRECOMBINE_FIELD.key(), "timestamp");
props.setProperty(FlinkOptions.RECORD_KEY_FIELD.key(), "id");
props.setProperty(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition_path");
HoodieFlinkWriteConfig writeConfig = HoodieFlinkWriteConfig.newBuilder()
.withWritePayloadRecordKey("id")
.withPath("/path/to/hudi/table")
.withTableName("hudi_table_name")
.withPreCombineField("timestamp")
.withProps(props)
.build();
// 构造 HoodieSink
HoodieWriteHandleFactory<HoodieWriteableRecord> handleFactory = new HoodieWriteHandleFactory<HoodieWriteableRecord>() {
@Override
public HoodieWriteableTable<HoodieWriteableRecord> create(
HoodieWriteClient client, String instantTime) {
return new HoodieWriteableTable<>(client, instantTime, HoodieWriteableRecord.class);
}
};
HoodieWriteable<HoodieWriteableRecord> hoodieWriteable = new HoodieWriteable<>(handleFactory);
HoodieSink<HoodieWriteableRecord> hoodieSink = new HoodieSink<>(writeConfig, hoodieWriteable,
new HoodieSinkFunction.HoodieSinkFunctionFactory<>(), new StreamWriteOperatorFactory<>());
// 数据源
env.addSource(new MyDataSource())
.map(new MyMapper())
.addSink(hoodieSink);
env.execute("Flink Hudi Demo");
}
}
通过HoodieFlinkWriteConfig来配置写入Hudi的相关参数,其中包括表路径、表名称、表类型、记录主键、分区字段等。通过HoodieSink将数据写入Hudi。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。