开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink有没有开源的ETL工具,可以实现读取kafka,写入到clickhouse表?

Flink有没有开源的ETL工具,可以实现读取kafka,写入到clickhouse表,对字段进行过滤重命名等操作?

展开
收起
wenti 2023-02-07 14:13:04 685 0
2 条回答
写回答
取消 提交回答
  • Flink提供了一个开源的ETL工具,可以实现从Kafka读取数据,然后写入到ClickHouse表中。

    2023-02-09 09:30:09
    赞同 展开评论 打赏
  • 读取kafka数据并且经过ETL后,通过JDBC存入clickhouse中。 - 定义POJO类:

    public class Student {
        private int id;
        private String name;
        private String password;
        private int age;
        private String date;
        //构造,setter 和 getter 省略
    }
    
    • 完整代码:
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    //###############定义消费kafka source##############
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("zookeeper.connect", "localhost:2181");
    props.put("group.id", "metric-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "latest");
    
    tableEnv.connect(new Kafka().version("0.10")
            .topic("student").properties(props).startFromLatest())
            .withFormat(new Json().deriveSchema())
            .withSchema(new Schema().field("id", Types.INT())
                                    .field("name", Types.STRING())
                                    .field("password", Types.STRING())
                                    .field("age", Types.INT())
                                    .field("date", Types.STRING()))
            .inAppendMode()
            .registerTableSource("kafkaTable");
    Table result = tableEnv.sqlQuery("SELECT * FROM " +  "kafkaTable");
    
    //###############定义clickhouse JDBC sink##############
    String targetTable = "clickhouse";
    TypeInformation[] fieldTypes = {BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
    TableSink jdbcSink =  JDBCAppendTableSink.builder()
                          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
                          .setDBUrl("jdbc:clickhouse://localhost:8123")
                          .setQuery("insert into student_local(id, name, password, age, date) values(?, ?, ?, ?, ?)")
                          .setParameterTypes(fieldTypes)
                          .setBatchSize(15)
                          .build();
    
    tableEnv.registerTableSink(targetTable,new String[]{"id","name", "password", "age", "date"}, new TypeInformation[]{Types.INT(), Types.STRING(), Types.STRING(), Types.INT(), Types.STRING()}, jdbcSink);
    
    result.insertInto(targetTable);
    env.execute("Flink add sink");
    
    • POM:
    <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-java</artifactId>
         <version>${flink.version}</version>
         <!--<scope>provided</scope>-->
     </dependency>
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
        <!-- <scope>provided</scope>-->
     </dependency>
    
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
         <!-- <scope>provided</scope>-->
     </dependency>          
     <dependency>
         <groupId>ru.yandex.clickhouse</groupId>
         <artifactId>clickhouse-jdbc</artifactId>
         <version>0.2</version>
     </dependency>
    
     <dependency>
         <groupId>org.apache.httpcomponents</groupId>
         <artifactId>httpcore</artifactId>
         <version>4.4.4</version>
     </dependency>
    
     <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
         <version>19.0</version>
     </dependency>
    
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
     </dependency>
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-json</artifactId>
         <version>${flink.version}</version>
     </dependency>
    
     <!-- Either... -->
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
     </dependency>
    
     <!-- Add connector dependencies here. They must be in the default scope
         (compile). -->
     <!-- this is for kafka consuming -->
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
     </dependency>
    <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
    </dependency>
    
    2023-02-07 22:59:37
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    朱翥、贺小令|更快更稳更易用:Flink 自适应批处理能力演 立即下载
    ClickHouse在手淘流量分析应用实践Jason Xu 立即下载
    云数据库clickhouse最佳实践 立即下载