flink 集成iceberg 实践

本文涉及的产品
对象存储 OSS,20GB 3个月
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,内容安全 1000次 1年
简介: flink 集成iceberg 实践

flink cdc postgresql sink iceberg with datastream

package com.zjyg.iceberg.flink;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.Schema;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class Pg2Iceberg {
    private static final Schema SCHEMA =
            new Schema(
                    Types.NestedField.optional(1, "id", Types.IntegerType.get()),
                    Types.NestedField.optional(2, "name", Types.StringType.get()),
                    Types.NestedField.optional(3, "age", Types.IntegerType.get()),
                    Types.NestedField.optional(4, "sex", Types.StringType.get())
            );
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.setCheckpointStorage(parameterTool.get("checkpoint_base_dir")+"/"+parameterTool.get("catalog_name")+"."+parameterTool.get("iceberg_db_name")+"."+parameterTool.get("iceberg_tb_name"));
        checkpointConfig.setCheckpointInterval(60 * 1000L);
        checkpointConfig.setMinPauseBetweenCheckpoints(60 * 1000L);
        checkpointConfig.setTolerableCheckpointFailureNumber(10);
        checkpointConfig.setCheckpointTimeout(120 * 1000L);
        DataStreamSource<RowData> src = env.addSource(getPgCdc(parameterTool));
        icebergSink_hadoop(src, parameterTool);
        env.execute(parameterTool.get("app_name"));
    }
    private static void icebergSink_hadoop(DataStream<RowData> src, ParameterTool tool) {
        Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("catalog-type", "hadoop");
        properties.put("property-version", "1");
        properties.put("warehouse", tool.get("warehouse_base_dir")+"/"+tool.get("catalog_name"));
        CatalogLoader catalogLoader =
                CatalogLoader.hadoop(tool.get("catalog_name"), new Configuration(), properties);
        icebergSink(src, tool, catalogLoader);
    }
    private static void icebergSink(DataStream input, ParameterTool tool, CatalogLoader loader) {
        Catalog catalog = loader.loadCatalog();
        TableIdentifier identifier =
                TableIdentifier.of(Namespace.of(tool.get("iceberg_db_name")), tool.get("iceberg_tb_name"));
        Table table;
        if (catalog.tableExists(identifier)) {
            table = catalog.loadTable(identifier);
        } else {
            table =
                    catalog.buildTable(identifier, SCHEMA)
                            .withPartitionSpec(PartitionSpec.unpartitioned())
                            .create();
        }
        TableOperations operations = ((BaseTable) table).operations();
        TableMetadata metadata = operations.current();
        operations.commit(metadata, metadata.upgradeToFormatVersion(2));
        TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier);
        FlinkSink.forRowData(input)
                .table(table)
                .tableLoader(tableLoader)
                .equalityFieldColumns(Arrays.asList("id"))
                .writeParallelism(1)
                .build();
    }
    private static SourceFunction getPgCdc(ParameterTool tool) {
        TableSchema schema =
                TableSchema.builder()
                        .add(TableColumn.physical("id", DataTypes.INT()))
                        .add(TableColumn.physical("name", DataTypes.STRING()))
                        .add(TableColumn.physical("age", DataTypes.INT()))
                        .add(TableColumn.physical("sex", DataTypes.STRING()))
                        .build();
        RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
        DebeziumDeserializationSchema deserialer =
                new RowDataDebeziumDeserializeSchema(
                        rowType,
                        createTypeInfo(schema.toRowDataType()),
                        (rowData, rowKind) -> {},
                        ZoneId.of("Asia/Shanghai"));
        Properties properties = new Properties();                
        properties.setProperty("decimal.handling.mode", "string");   //decimal等类型转化为string
        DebeziumSourceFunction sourceFunction =
                PostgreSQLSource.<RowData>builder()
                        .hostname(tool.get("db_host"))
                        .port(Integer.parseInt(tool.get("db_port")))
                        .database(tool.get("db_name"))
                        .schemaList(tool.get("schema_name"))
                        .tableList(tool.get("schema_name") + "." + tool.get("tb_name"))
                        .username(tool.get("db_user"))
                        .password(tool.get("db_user_pwd"))
                        .decodingPluginName("pgoutput")
                        .slotName(parameterTool.get("SLOTNAME4"))                        .deserializer(deserialer)
                        .deserializer(deserialer)
                        .debeziumProperties(properties)
                        .build();
        return sourceFunction;
    }
    private static TypeInformation<RowData> createTypeInfo(DataType producedDataType) {
        final DataType internalDataType =
                DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);
        return (TypeInformation<RowData>)
                TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);
    }
}

flink kafka sink iceberg datastream

import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.coomia.datalake.kafka.KafkaUtils;
public class FlinkWriteIcebergTest {
  public static void main(String[] args) throws Exception {
    System.setProperty("HADOOP_USER_NAME", "root");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(5000L);
    env.setParallelism(1);
    // iceberg catalog identification.
    Configuration conf = new Configuration();
    Catalog catalog = new HadoopCatalog(conf);
    // iceberg table identification.
    TableIdentifier name =
        TableIdentifier.of("default", "iceberg-tb-" + System.currentTimeMillis());
    // iceberg table schema identification.
    Schema schema = new Schema(Types.NestedField.required(1, "uid", Types.StringType.get()),
        Types.NestedField.required(2, "eventTime", Types.LongType.get()),
        Types.NestedField.required(3, "eventid", Types.StringType.get()),
        Types.NestedField.optional(4, "uuid", Types.StringType.get()));
        Types.NestedField.required(5, "ts", Types.TimestampType.withoutZone());
    // iceberg table partition identification.
    // PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uid", 5).build();
    PartitionSpec spec = PartitionSpec.unpartitioned();
    // identify using orc format as storage.
    Map<String, String> props =
        ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());
    Table table = null;
    // create an iceberg table if not exists, otherwise, load it.
    if (!catalog.tableExists(name))
      table = catalog.createTable(name, schema, spec, props);
    else
      table = catalog.loadTable(name);
    String topic = "arkevent";
    String servers = "kafka:9092";
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic,
        new SimpleStringSchema(), KafkaUtils.consumeProps(servers, "flink-consumer"));
    consumer.setStartFromEarliest();
    SingleOutputStreamOperator<RowData> dataStream =
        env.addSource(consumer).map(new MapFunction<String, RowData>() {
          @Override
          public RowData map(String value) throws Exception {
            JSONObject dataJson = JSON.parseObject(value);
            GenericRowData row = new GenericRowData(5);
            row.setField(0, StringData.fromBytes(dataJson.getString("uid").getBytes()));
            row.setField(1, dataJson.getLong("eventTime"));
            row.setField(2, StringData.fromBytes(dataJson.getString("eventid").getBytes()));
            row.setField(3, StringData.fromBytes(dataJson.getString("uuid").getBytes()));
            row.setField(4, TimestampData.fromEpochMillis(dataJson.getLong("eventTime")));
            return row;
          }
        });
    // uid is used for job restart or something when using savepoint.
    dataStream.uid("flink-consumer");
    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
    // sink data to iceberg table
    FlinkSink.forRowData(dataStream).table(table).tableLoader(tableLoader).writeParallelism(1)
        .overwrite(true)
        .build();
    //read and write to file.
    DataStream<RowData> batchData = FlinkSource.forRowData().env(env).tableLoader(tableLoader).build();
    batchData.print();
    batchData.writeAsCsv(tableLoader.loadTable().location().concat("/out/out.csv"), WriteMode.OVERWRITE, "\n", " ");
    // Execute the program.
    env.execute("Test Iceberg DataStream");
  }
}

spark kafka sink iceberg

package com.zjyg.iceberg.main
import com.zjyg.iceberg.util.TimeUtil
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DataTypes
import org.apache.iceberg.spark.SparkCatalog
import org.apache.iceberg.spark.IcebergSpark
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import com.alibaba.fastjson.JSON
object Kafka2IcebergV3 {
  case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
  case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String)
  import org.apache.spark.sql.functions._
  def main(args: Array[String]): Unit = {
    if(args.length < 3) {
      System.err.println(
        s"""
           |Usage: Kafka2Iceberg <brokers> <topics> <Seconds>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |  <groupId> is a kafka consumer gorupId
        """.stripMargin)
      System.exit(1)
    }
    val brokers = args(0)
    val topics = args(1)
    val groupId = args(2)
    val checkpointDir = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/spark"
    val checkpointDir_iceberg = "hdfs:///user/bigdata/sparkStreamingCheckpoint/Kafka2Iceberg_opentsdb-datav3/icebergtb"
    try {
      val spark = SparkSession.builder()
        .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.iceberg.type","hadoop")
        .config("spark.sql.catalog.iceberg",classOf[SparkCatalog].getName)
        .config("spark.sql.catalog.iceberg.warehouse","hdfs:///warehouse/tablespace/external/iceberg")
        .appName(this.getClass.getSimpleName)
        .getOrCreate()
      spark.sparkContext.setCheckpointDir(checkpointDir)
      // IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
      System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
      System.setProperty("java.security.krb5.conf", "./krb5.conf")
      import spark.implicits._
      val lines = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("startingOffsets", "earliest")
        .option("kafka.security.protocol","SASL_PLAINTEXT")
        .option("kafka.sasl.mechanism","GSSAPI")
        .option("kafka.sasl.kerberos.service.name","kafka")
        .option("kafka.group.id",groupId)
        .option("subscribe",topics)
        .load()
        .withColumn("value", $"value".cast("string"))
        .filter($"value".isNotNull)
      println("------PrintSchema lines-------")
      lines.printSchema()
      val data = lines.map(
        row => row.getAs[String]("value").toString()
      ).map(s => getLogs(s)).toDF()
      println("------PrintSchema data-------")
      data.printSchema()
      val tableIdentifier: String = "hdfs:///warehouse/tablespace/external/iceberg/iceberg_test/opentsdb_logsv3"
      val query = data.writeStream.format("iceberg").outputMode("append").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).option("path", tableIdentifier).option("checkpointLocation", checkpointDir_iceberg).start()
      query.awaitTermination()
      spark.close()
    } catch {
      case e: Exception => {
        System.err.println("exit. Exception is:" + e)
        System.exit(1)
      }
    }
  }
  def getTags(tags:String):Tags ={
    var result = Tags("",0,"","","","")
    try {
      val tags_columns = tags.split(',')
      val entrust_prop = (tags_columns(0).split('='))(1).toString
      val entrust_status = (tags_columns(1).split('='))(1).toInt
      val exchange_type = (tags_columns(2).split('='))(1).toString
      val op_entrust_way = (tags_columns(3).split('='))(1).toString
      val ywlx = (tags_columns(4).split('='))(1).toString
      val ywzl = (tags_columns(5).split('='))(1).toString
      result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
      }
    }
    return result
  }
  def getLogs(logs:String):Logs = {
    var v_tags = new Tags("",0,"","","","")
    var result = new Logs("","",new Timestamp(0L),0,"","",v_tags,"","","",0,"")
    try {
      val json=JSON.parseObject(logs)
      val metric = json.getString("metric")
      val endpoint = json.getString("endpoint")
      val ts_l = json.getInteger("timestamp") * 1000L
      val ts = new Timestamp(ts_l)
      val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(ts_l)
      val step = json.getInteger("step")
      val l_value = json.getString("value")
      val countertype = json.getString("counterType")
      val j_tags = json.getString("tags")
      val tags = getTags(j_tags)
      val l_type = json.getString("type")
      val kafka = json.getString("kafka")
      val nodata = json.getString("nodata")
      result = new Logs(metric,endpoint,ts,step,l_value,countertype,tags,l_type,kafka,nodata,dt,hour)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        System.exit(1)
      }
    }
    return result
  }
}

flink sink iceberg compation

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.actions.Actions;
import java.util.HashMap;
import java.util.Map;
public class FlinkCompaction {
    public static void main(String[] args) throws Exception {
        ParameterTool tool = ParameterTool.fromArgs(args);
        Map<String, String> properties = new HashMap<>();
        properties.put("type", "iceberg");
        properties.put("catalog-type", "hive");
        properties.put("property-version", "1");
        properties.put("warehouse", tool.get("warehouse"));
        properties.put("uri", tool.get("uri"));
        if (tool.has("oss.endpoint")) {
            properties.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
            properties.put("oss.endpoint", tool.get("oss.endpoint"));
            properties.put("oss.access.key.id", tool.get("oss.access.key.id"));
            properties.put("oss.access.key.secret", tool.get("oss.access.key.secret"));
        }
        CatalogLoader loader =
                CatalogLoader.hive(tool.get("catalog"), new Configuration(), properties);
        Catalog catalog = loader.loadCatalog();
        TableIdentifier identifier =
                TableIdentifier.of(Namespace.of(tool.get("db")), tool.get("table"));
        Table table = catalog.loadTable(identifier);
        /**
        * 合并小文件,核心代码
        */
        Actions.forTable(table)
                .rewriteDataFiles()
                .maxParallelism(5)
                .targetSizeInBytes(128 * 1024 * 1024)
                .execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
}

spark sink iceberg compaction

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.Actions;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
/**
 * @author: zhushang
 * @create: 2021-04-02 14:30
 */
public class SparkCompaction {
    public static void main(String[] args) {
        TableIdentifier identifier = TableIdentifier.of(Namespace.of("db"), "table");
        Map<String, String> config = new HashMap<>();
        config.put("type", "iceberg");
        config.put("catalog-type", "hive");
        config.put("property-version", "1");
        config.put("warehouse", "warehouse");
        config.put("uri", "thrift://local:9083");
        config.put("io-impl", "org.apache.iceberg.aliyun.oss.OSSFileIO");
        config.put("oss.endpoint", "https://xxx.aliyuncs.com");
        config.put("oss.access.key.id", "key");
        config.put("oss.access.key.secret", "secret");
        sparkSession();
        HiveCatalog hiveCatalog = new HiveCatalog(new Configuration());
        hiveCatalog.initialize("iceberg_hive_catalog", config);
        Table table = hiveCatalog.loadTable(identifier);
        Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(128 * 1024 * 1024).execute();
        Snapshot snapshot = table.currentSnapshot();
        if (snapshot != null) {
            table.expireSnapshots().expireOlderThan(snapshot.timestampMillis()).commit();
        }
    }
    private static void sparkSession() {
        SparkSession.builder()
                .master("local[*]")
                .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
                .config("spark.hadoop." + METASTOREURIS.varname, "localhost:9083")
                .config("spark.sql.warehouse.dir", "warehouse")
                .config("spark.executor.heartbeatInterval", "100000")
                .config("spark.network.timeoutInterval", "100000")
                .enableHiveSupport()
                .getOrCreate();
    }
}

spark Opentsdb sink iceberg

package com.zjyg.iceberg.spark
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import com.alibaba.fastjson.JSON
import com.zjyg.iceberg.util.TimeUtil
import org.apache.iceberg.spark.SparkCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object OpentsdbLog2Iceberg {
  case class Tags(entrust_prop: String, entrust_status: Int, exchange_type: String, op_entrust_way: String, ywlx: String, ywzl: String)
  case class Logs(metric: String, endpoint: String, ts: Timestamp, step: Int, l_value: String, countertype: String, tags: Tags, l_type:String, kafka: String, nodata: String, dt: Int, hour: String,row_info: String)
  def main(args: Array[String]): Unit = {
    if(args.length < 9) {
      System.err.println(
        s"""
           |Usage: Kafka2IcebergV2 <brokers> <topics> <groupId> <checkpointBaseDir> ...
           |  <1. brokers> is a list of one or more Kafka brokers
           |  <2. topics> is a list of one or more kafka topics to consume from
           |  <3. groupId> is a kafka consumer gorupId
           |  <4. checkpointBaseDir> is a checkpoint dir for job
           |  <5. warehouseBaseDir> is a warehouse base dir for iceberg
           |  <6. catalogName> is a catalogName for iceberg
           |  <7. dbName> is a DatabaseName for iceberg
           |  <8. tbName> is a TableName for iceberg
           |  <9. kafkaServiceName> is a Kafka Service Name
        """.stripMargin)
      System.exit(1)
    }
    val brokers = args(0)
    val topics = args(1)
    val groupId = args(2)
    val checkpointBaseDir = args(3) // hdfs:///user/bigdata/sparkStreamingCheckpoint
    val warehouseBaseDir = args(4)  // hdfs:///warehouse/tablespace/external
    val catalogName = args(5)       // iceberg
    val dbName = args(6)            // iceberg_test
    val tbName = args(7)            // opentsdb_logs
    val kafkaServiceName = args(8)  // hdp-kafka
    val checkpointDir_spark = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/spark"
    val checkpointDir_iceberg = s"${checkpointBaseDir}/${catalogName}.${dbName}.${tbName}/iceberg"
    try {
      val spark = SparkSession.builder()
        .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config(s"spark.sql.catalog.${catalogName}.type","hadoop")
        .config(s"spark.sql.catalog.${catalogName}",classOf[SparkCatalog].getName)
        .config(s"spark.sql.catalog.${catalogName}.warehouse",s"${warehouseBaseDir}/${catalogName}")
        .getOrCreate()
      spark.sparkContext.setCheckpointDir(checkpointDir_spark)
      System.setProperty("java.security.auth.login.config", "./kafka_jaas.conf")
      System.setProperty("java.security.krb5.conf", "./krb5.conf")
      import spark.implicits._
      val lines = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("startingOffsets", "earliest")
        .option("failOnDataLoss","false")
        .option("kafka.security.protocol","SASL_PLAINTEXT")
        .option("kafka.sasl.mechanism","GSSAPI")
        .option("kafka.sasl.kerberos.service.name",s"${kafkaServiceName}")
        .option("kafka.group.id",groupId)
        .option("subscribe",topics)
        .load()
        .withColumn("value", $"value".cast("string"))
        .filter($"value".isNotNull)
      println("------PrintSchema lines-------")
      lines.printSchema()
      val data = lines.map(
        row => getLogs(row.getAs[Timestamp]("timestamp"),row.getAs[String]("value").toString())
      ).toDF().repartition($"dt", $"hour")
      println("------PrintSchema data-------")
      data.printSchema()
      val tableIdentifier: String = s"${warehouseBaseDir}/${catalogName}/${dbName}/${tbName}"
      val query = data.writeStream
        .format("iceberg")
        .partitionBy("dt","hour")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", tableIdentifier)
        .option("fanout-enabled", "true")
        .option("checkpointLocation", checkpointDir_iceberg)
        .start()
      query.awaitTermination()
      spark.close()
    } catch {
      case e: Exception => {
        System.err.println("exit. Exception is:" + e)
        System.exit(1)
      }
    }
  }
  def getTags(tags:String):Tags ={
    var result = Tags("",0,"","","","")
    try {
      val tags_columns = tags.split(',')
      val entrust_prop = (tags_columns(0).split('='))(1).toString
      val entrust_status = (tags_columns(1).split('='))(1).toInt
      val exchange_type = (tags_columns(2).split('='))(1).toString
      val op_entrust_way = (tags_columns(3).split('='))(1).toString
      val ywlx = (tags_columns(4).split('='))(1).toString
      val ywzl = (tags_columns(5).split('='))(1).toString
      result = Tags(entrust_prop, entrust_status, exchange_type, op_entrust_way, ywlx, ywzl)
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        // System.exit(1)
      }
    }
    return result
  }
  def getLogs(kafkaTimestamp:Timestamp,logs:String):Logs = {
    var tags = new Tags("",0,"","","","")
    var result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",19700101,"00","")
    try {
      val json=JSON.parseObject(logs)
      val metric = json.getString("metric")
      val endpoint = json.getString("endpoint")
      val v_ts = json.getInteger("timestamp") * 1000L
      val ts = new Timestamp(v_ts)
      val (dt:Int, hour:String) = TimeUtil.getTsYmdHour(v_ts)
      val step = json.getInteger("step")
      val v_value = json.getString("value")
      val countertype = json.getString("counterType")
      val tags = getTags(json.getString("tags"))
      val v_type = json.getString("type")
      val kafka = json.getString("kafka")
      val nodata = json.getString("nodata")
      result = new Logs(metric,endpoint,ts,step,v_value,countertype,tags,v_type,kafka,nodata,dt,hour,"")
    } catch {
      case e: Exception => {
        System.err.println("Exception is:" + e)
        // System.exit(1)
        val (dt,hour) = TimeUtil.tsToDateHour(kafkaTimestamp)
        result = new Logs("","",new Timestamp(0L),0,"","",tags,"","","",-dt,hour,logs)
      }
    }
    return result
  }
}



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
12天前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
52 15
|
24天前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
38 3
|
23天前
|
运维 Devops jenkins
DevOps实践:自动化部署与持续集成
【8月更文挑战第29天】本文深入探讨了DevOps文化中的两大核心实践——自动化部署和持续集成。通过介绍这两个概念,解释了它们如何相互促进并提升软件开发的效率和质量。文章将展示具体的代码示例,指导读者如何在实际项目中实现这些实践,以及如何从中受益。
|
17天前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
384 14
Flink CDC 在货拉拉的落地与实践
|
5天前
|
Ubuntu jenkins 测试技术
软件测试中的自动化与持续集成实践
【9月更文挑战第15天】在软件开发的快节奏世界中,自动化测试和持续集成(CI)已成为确保质量和效率的关键策略。本文旨在揭示如何通过实施自动化测试框架和CI流程来优化开发周期,减少人为错误,并加快产品上市时间。我们将探讨一些实用的工具和技术,以及它们如何帮助团队实现更流畅、更可靠的软件发布。
|
10天前
|
监控 Devops 测试技术
DevOps实践: 持续集成和持续部署(CI/CD)的入门指南
【9月更文挑战第10天】在快速迭代的软件开发世界中,DevOps已经成为加速产品交付、提升软件质量和团队协作的关键策略。本文将深入浅出地介绍DevOps的核心组成部分——持续集成(Continuous Integration, CI)与持续部署(Continuous Deployment, CD)的基本概念、实施步骤以及它们如何革新传统的软件开发流程。你将学习到如何通过自动化工具简化开发流程,并理解为什么CI/CD是现代软件开发不可或缺的一环。
|
18天前
|
运维 Cloud Native Devops
云原生时代的DevOps实践:自动化、持续集成与持续部署
【9月更文挑战第3天】未来,随着人工智能、大数据等技术的不断融入,DevOps实践将更加智能化和自动化。我们将看到更多创新的技术和工具涌现出来,为软件开发和运维带来更多便利和效益。同时,跨团队协作和集成也将得到进一步加强,推动软件开发向更加高效、可靠和灵活的方向发展。
|
17天前
|
Devops jenkins Shell
DevOps实践:持续集成与持续部署(CI/CD)的探索之旅
【9月更文挑战第3天】在软件开发的世界里,DevOps已经成为了提升效率、加速产品迭代的关键。本文将深入浅出地探讨DevOps文化中的核心实践——持续集成(Continuous Integration,CI)和持续部署(Continuous Deployment,CD),并展示如何通过实际操作来优化开发流程。我们将一起踏上这段旅程,解锁自动化的魅力,让代码更流畅地转化为价值。
|
22天前
|
Java Devops 持续交付
探索Java中的Lambda表达式:简化代码,提升效率DevOps实践:持续集成与部署的自动化之路
【8月更文挑战第30天】本文深入探讨了Java 8中引入的Lambda表达式如何改变了我们编写和管理代码的方式。通过简化代码结构,提高开发效率,Lambda表达式已成为现代Java开发不可或缺的一部分。文章将通过实际例子展示Lambda表达式的强大功能和优雅用法。
|
24天前
|
jenkins 测试技术 持续交付
解锁.NET项目高效秘籍:从理论迷雾到实践巅峰,持续集成与自动化测试如何悄然改变游戏规则?
【8月更文挑战第28天】在软件开发领域,持续集成(CI)与自动化测试已成为提升效率和质量的关键工具。尤其在.NET项目中,二者的结合能显著提高开发速度并保证软件稳定性。本文将从理论到实践,详细介绍CI与自动化测试的重要性,并以ASP.NET Core Web API项目为例,演示如何使用Jenkins和NUnit实现自动化构建与测试。每次代码提交后,Jenkins自动触发构建流程,通过编译和运行NUnit测试确保代码质量。这种方式不仅节省了时间,还能快速发现并解决问题,推动.NET项目开发迈向更高水平。
34 8

热门文章

最新文章